1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "perfetto/tracing/core/startup_trace_writer.h"
18 
19 #include <numeric>
20 
21 #include "perfetto/base/logging.h"
22 #include "perfetto/protozero/proto_utils.h"
23 #include "perfetto/trace/trace_packet.pbzero.h"
24 #include "perfetto/tracing/core/shared_memory_abi.h"
25 #include "perfetto/tracing/core/startup_trace_writer_registry.h"
26 #include "src/tracing/core/patch_list.h"
27 #include "src/tracing/core/shared_memory_arbiter_impl.h"
28 
29 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
30 
31 namespace perfetto {
32 
33 namespace {
34 
NewChunk(SharedMemoryArbiterImpl * arbiter,WriterID writer_id,ChunkID chunk_id,bool fragmenting_packet)35 SharedMemoryABI::Chunk NewChunk(SharedMemoryArbiterImpl* arbiter,
36                                 WriterID writer_id,
37                                 ChunkID chunk_id,
38                                 bool fragmenting_packet) {
39   ChunkHeader::Packets packets = {};
40   if (fragmenting_packet) {
41     packets.count = 1;
42     packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
43   }
44 
45   // The memory order of the stores below doesn't really matter. This |header|
46   // is just a local temporary object. The GetNewChunk() call below will copy it
47   // into the shared buffer with the proper barriers.
48   ChunkHeader header = {};
49   header.writer_id.store(writer_id, std::memory_order_relaxed);
50   header.chunk_id.store(chunk_id, std::memory_order_relaxed);
51   header.packets.store(packets, std::memory_order_relaxed);
52 
53   return arbiter->GetNewChunk(header);
54 }
55 
56 class LocalBufferReader {
57  public:
LocalBufferReader(protozero::ScatteredHeapBuffer * buffer)58   LocalBufferReader(protozero::ScatteredHeapBuffer* buffer)
59       : buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {}
60 
ReadBytes(SharedMemoryABI::Chunk * target_chunk,size_t num_bytes,size_t cur_payload_size)61   size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk,
62                    size_t num_bytes,
63                    size_t cur_payload_size) {
64     PERFETTO_CHECK(target_chunk->payload_size() >=
65                    num_bytes + cur_payload_size);
66     uint8_t* target_ptr = target_chunk->payload_begin() + cur_payload_size;
67     size_t bytes_read = 0;
68     while (bytes_read < num_bytes) {
69       if (cur_slice_ == buffer_slices_.end())
70         return bytes_read;
71 
72       auto cur_slice_range = cur_slice_->GetUsedRange();
73 
74       if (cur_slice_range.size() == cur_slice_offset_) {
75         cur_slice_offset_ = 0;
76         cur_slice_++;
77         continue;
78       }
79 
80       size_t read_size = std::min(num_bytes - bytes_read,
81                                   cur_slice_range.size() - cur_slice_offset_);
82       memcpy(target_ptr + bytes_read, cur_slice_range.begin + cur_slice_offset_,
83              read_size);
84       cur_slice_offset_ += read_size;
85       bytes_read += read_size;
86 
87       // Should have either read all of the chunk or completed reading now.
88       PERFETTO_DCHECK(cur_slice_offset_ == cur_slice_range.size() ||
89                       bytes_read == num_bytes);
90     }
91     return bytes_read;
92   }
93 
TotalUsedSize() const94   size_t TotalUsedSize() const {
95     size_t used_size = 0;
96     for (const auto& slice : buffer_slices_) {
97       used_size += slice.GetUsedRange().size();
98     }
99     return used_size;
100   }
101 
DidReadAllData() const102   bool DidReadAllData() const {
103     if (cur_slice_ == buffer_slices_.end())
104       return true;
105 
106     const auto next_slice = cur_slice_ + 1;
107     return next_slice == buffer_slices_.end() &&
108            cur_slice_->GetUsedRange().size() == cur_slice_offset_;
109   }
110 
111  private:
112   const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_;
113 
114   // Iterator pointing to slice in |buffer_slices_| that we're currently reading
115   // from.
116   std::vector<protozero::ScatteredHeapBuffer::Slice>::const_iterator cur_slice_;
117   // Read offset in the current slice in bytes.
118   size_t cur_slice_offset_ = 0;
119 };
120 
121 }  // namespace
122 
StartupTraceWriter(std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle)123 StartupTraceWriter::StartupTraceWriter(
124     std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle)
125     : registry_handle_(std::move(registry_handle)),
126       memory_buffer_(new protozero::ScatteredHeapBuffer()),
127       memory_stream_writer_(
128           new protozero::ScatteredStreamWriter(memory_buffer_.get())) {
129   memory_buffer_->set_writer(memory_stream_writer_.get());
130   PERFETTO_DETACH_FROM_THREAD(writer_thread_checker_);
131 }
132 
StartupTraceWriter(std::unique_ptr<TraceWriter> trace_writer)133 StartupTraceWriter::StartupTraceWriter(
134     std::unique_ptr<TraceWriter> trace_writer)
135     : was_bound_(true), trace_writer_(std::move(trace_writer)) {}
136 
~StartupTraceWriter()137 StartupTraceWriter::~StartupTraceWriter() {
138   if (registry_handle_)
139     registry_handle_->OnWriterDestroyed(this);
140 }
141 
BindToArbiter(SharedMemoryArbiterImpl * arbiter,BufferID target_buffer)142 bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter,
143                                        BufferID target_buffer) {
144   // Create and destroy trace writer without holding lock, since this will post
145   // a task and task posting may trigger a trace event, which would cause a
146   // deadlock. This may create a few more trace writers than necessary in cases
147   // where a concurrent write is in progress (other than causing some
148   // computational overhead, this is not problematic).
149   auto trace_writer = arbiter->CreateTraceWriter(target_buffer);
150 
151   {
152     std::lock_guard<std::mutex> lock(lock_);
153 
154     PERFETTO_DCHECK(!trace_writer_);
155 
156     // Can't bind while the writer thread is writing.
157     if (write_in_progress_)
158       return false;
159 
160     // If there's a pending trace packet, it should have been completed by the
161     // writer thread before write_in_progress_ is reset.
162     if (cur_packet_) {
163       PERFETTO_DCHECK(cur_packet_->is_finalized());
164       cur_packet_.reset();
165     }
166 
167     trace_writer_ = std::move(trace_writer);
168     ChunkID next_chunk_id = CommitLocalBufferChunks(
169         arbiter, trace_writer_->writer_id(), target_buffer);
170 
171     // The real TraceWriter should start writing at the subsequent chunk ID.
172     bool success = trace_writer_->SetFirstChunkId(next_chunk_id);
173     PERFETTO_DCHECK(success);
174 
175     memory_stream_writer_.reset();
176     memory_buffer_.reset();
177   }
178 
179   return true;
180 }
181 
NewTracePacket()182 TraceWriter::TracePacketHandle StartupTraceWriter::NewTracePacket() {
183   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
184 
185   // Check if we are already bound without grabbing the lock. This is an
186   // optimization to avoid any locking in the common case where the proxy was
187   // bound some time ago.
188   if (PERFETTO_LIKELY(was_bound_)) {
189     PERFETTO_DCHECK(!cur_packet_);
190     PERFETTO_DCHECK(trace_writer_);
191     return trace_writer_->NewTracePacket();
192   }
193 
194   // Now grab the lock and safely check whether we are still unbound.
195   {
196     std::unique_lock<std::mutex> lock(lock_);
197     if (trace_writer_) {
198       PERFETTO_DCHECK(!cur_packet_);
199       // Set the |was_bound_| flag to avoid locking in future calls to
200       // NewTracePacket().
201       was_bound_ = true;
202       // Don't hold the lock while calling NewTracePacket() on |trace_writer_|.
203       // This is safe because |trace_writer_| remains valid once set. It also
204       // avoids deadlocks that may be caused by holding the lock while waiting
205       // for a new SMB chunk in |trace_writer_|.
206       lock.unlock();
207       return trace_writer_->NewTracePacket();
208     }
209     // Not bound. Make sure it stays this way until the TracePacketHandle goes
210     // out of scope by setting |write_in_progress_|.
211     PERFETTO_DCHECK(!write_in_progress_);
212     write_in_progress_ = true;
213   }
214 
215   // Write to the local buffer.
216   if (cur_packet_) {
217     // If we hit this, the caller is calling NewTracePacket() without having
218     // finalized the previous packet.
219     PERFETTO_DCHECK(cur_packet_->is_finalized());
220   } else {
221     cur_packet_.reset(new protos::pbzero::TracePacket());
222   }
223   cur_packet_->Reset(memory_stream_writer_.get());
224   TraceWriter::TracePacketHandle handle(cur_packet_.get());
225   // |this| outlives the packet handle.
226   handle.set_finalization_listener(this);
227   return handle;
228 }
229 
Flush(std::function<void ()> callback)230 void StartupTraceWriter::Flush(std::function<void()> callback) {
231   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
232   // It's fine to check |was_bound_| instead of acquiring the lock because
233   // |trace_writer_| will only need flushing after the first trace packet was
234   // written to it and |was_bound_| is set.
235   if (PERFETTO_LIKELY(was_bound_)) {
236     PERFETTO_DCHECK(trace_writer_);
237     return trace_writer_->Flush(std::move(callback));
238   }
239 
240   // Can't flush while unbound.
241   if (callback)
242     callback();
243 }
244 
writer_id() const245 WriterID StartupTraceWriter::writer_id() const {
246   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
247   // We can't acquire the lock because this is a const method. So we'll only
248   // proxy to |trace_writer_| once we have written the first packet to it
249   // instead.
250   if (PERFETTO_LIKELY(was_bound_)) {
251     PERFETTO_DCHECK(trace_writer_);
252     return trace_writer_->writer_id();
253   }
254   return 0;
255 }
256 
written() const257 uint64_t StartupTraceWriter::written() const {
258   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
259   // We can't acquire the lock because this is a const method. So we'll only
260   // proxy to |trace_writer_| once we have written the first packet to it
261   // instead.
262   if (PERFETTO_LIKELY(was_bound_)) {
263     PERFETTO_DCHECK(trace_writer_);
264     return trace_writer_->written();
265   }
266   return 0;
267 }
268 
used_buffer_size()269 size_t StartupTraceWriter::used_buffer_size() {
270   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
271   if (PERFETTO_LIKELY(was_bound_))
272     return 0;
273 
274   std::lock_guard<std::mutex> lock(lock_);
275   if (trace_writer_)
276     return 0;
277 
278   size_t used_size = 0;
279   memory_buffer_->AdjustUsedSizeOfCurrentSlice();
280   for (const auto& slice : memory_buffer_->slices()) {
281     used_size += slice.GetUsedRange().size();
282   }
283   return used_size;
284 }
285 
OnMessageFinalized(protozero::Message * message)286 void StartupTraceWriter::OnMessageFinalized(protozero::Message* message) {
287   PERFETTO_DCHECK(cur_packet_.get() == message);
288   PERFETTO_DCHECK(cur_packet_->is_finalized());
289   // Finalize() is a no-op because the packet is already finalized.
290   uint32_t packet_size = cur_packet_->Finalize();
291   packet_sizes_.push_back(packet_size);
292 
293   // Write is complete, reset the flag to allow binding.
294   std::lock_guard<std::mutex> lock(lock_);
295   PERFETTO_DCHECK(write_in_progress_);
296   write_in_progress_ = false;
297 }
298 
CommitLocalBufferChunks(SharedMemoryArbiterImpl * arbiter,WriterID writer_id,BufferID target_buffer)299 ChunkID StartupTraceWriter::CommitLocalBufferChunks(
300     SharedMemoryArbiterImpl* arbiter,
301     WriterID writer_id,
302     BufferID target_buffer) {
303   // TODO(eseckler): Write and commit these chunks asynchronously. This would
304   // require that the service is informed of the missing initial chunks, e.g. by
305   // committing our first chunk here before the new trace writer has a chance to
306   // commit its first chunk. Otherwise the service wouldn't know to wait for our
307   // chunks.
308 
309   if (packet_sizes_.empty() || !writer_id)
310     return 0;
311 
312   memory_buffer_->AdjustUsedSizeOfCurrentSlice();
313   LocalBufferReader local_buffer_reader(memory_buffer_.get());
314 
315   PERFETTO_DCHECK(local_buffer_reader.TotalUsedSize() ==
316                   std::accumulate(packet_sizes_.begin(), packet_sizes_.end(),
317                                   static_cast<size_t>(0u)));
318 
319   ChunkID next_chunk_id = 0;
320   SharedMemoryABI::Chunk cur_chunk =
321       NewChunk(arbiter, writer_id, next_chunk_id++, false);
322 
323   size_t max_payload_size = cur_chunk.payload_size();
324   size_t cur_payload_size = 0;
325   uint16_t cur_num_packets = 0;
326   size_t total_num_packets = packet_sizes_.size();
327   PatchList empty_patch_list;
328   for (size_t packet_idx = 0; packet_idx < total_num_packets; packet_idx++) {
329     uint32_t packet_size = packet_sizes_[packet_idx];
330     uint32_t remaining_packet_size = packet_size;
331     ++cur_num_packets;
332     do {
333       uint32_t fragment_size = static_cast<uint32_t>(
334           std::min(static_cast<size_t>(remaining_packet_size),
335                    max_payload_size - cur_payload_size -
336                        SharedMemoryABI::kPacketHeaderSize));
337       // Write packet header, i.e. the fragment size.
338       protozero::proto_utils::WriteRedundantVarInt(
339           fragment_size, cur_chunk.payload_begin() + cur_payload_size);
340       cur_payload_size += SharedMemoryABI::kPacketHeaderSize;
341 
342       // Copy packet content into the chunk.
343       size_t bytes_read = local_buffer_reader.ReadBytes(
344           &cur_chunk, fragment_size, cur_payload_size);
345       PERFETTO_DCHECK(bytes_read == fragment_size);
346 
347       cur_payload_size += fragment_size;
348       remaining_packet_size -= fragment_size;
349 
350       bool last_write =
351           packet_idx == total_num_packets - 1 && remaining_packet_size == 0;
352 
353       // We should return the current chunk if we've filled its payload, reached
354       // the maximum number of packets, or wrote everything we wanted to.
355       bool return_chunk =
356           cur_payload_size >=
357               max_payload_size - SharedMemoryABI::kPacketHeaderSize ||
358           cur_num_packets == ChunkHeader::Packets::kMaxCount || last_write;
359 
360       if (return_chunk) {
361         auto new_packet_count =
362             cur_chunk.IncreasePacketCountTo(cur_num_packets);
363         PERFETTO_DCHECK(new_packet_count == cur_num_packets);
364 
365         bool is_fragmenting = remaining_packet_size > 0;
366         if (is_fragmenting) {
367           PERFETTO_DCHECK(cur_payload_size == max_payload_size);
368           cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
369         }
370 
371         arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer,
372                                       &empty_patch_list);
373 
374         // Avoid creating a new chunk after the last write.
375         if (!last_write) {
376           cur_chunk =
377               NewChunk(arbiter, writer_id, next_chunk_id++, is_fragmenting);
378           max_payload_size = cur_chunk.payload_size();
379           cur_payload_size = 0;
380           cur_num_packets = is_fragmenting ? 1 : 0;
381         } else {
382           PERFETTO_DCHECK(!is_fragmenting);
383         }
384       }
385     } while (remaining_packet_size > 0);
386   }
387 
388   // The last chunk should have been returned.
389   PERFETTO_DCHECK(!cur_chunk.is_valid());
390   // We should have read all data from the local buffer.
391   PERFETTO_DCHECK(local_buffer_reader.DidReadAllData());
392 
393   return next_chunk_id;
394 }
395 
396 }  // namespace perfetto
397