1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/tracing/core/trace_writer_impl.h"
18 
19 #include <string.h>
20 
21 #include <algorithm>
22 #include <type_traits>
23 #include <utility>
24 
25 #include "perfetto/base/logging.h"
26 #include "perfetto/protozero/proto_utils.h"
27 #include "src/tracing/core/shared_memory_arbiter_impl.h"
28 
29 #include "perfetto/trace/trace_packet.pbzero.h"
30 
31 using protozero::proto_utils::kMessageLengthFieldSize;
32 using protozero::proto_utils::WriteRedundantVarInt;
33 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
34 
35 namespace perfetto {
36 
37 namespace {
38 constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
39 }  // namespace
40 
TraceWriterImpl(SharedMemoryArbiterImpl * shmem_arbiter,WriterID id,BufferID target_buffer)41 TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter,
42                                  WriterID id,
43                                  BufferID target_buffer)
44     : shmem_arbiter_(shmem_arbiter),
45       id_(id),
46       target_buffer_(target_buffer),
47       protobuf_stream_writer_(this) {
48   // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
49   // more gracefully and always return a no-op TracePacket in NewTracePacket().
50   PERFETTO_CHECK(id_ != 0);
51 
52   cur_packet_.reset(new protos::pbzero::TracePacket());
53   cur_packet_->Finalize();  // To avoid the DCHECK in NewTracePacket().
54 }
55 
~TraceWriterImpl()56 TraceWriterImpl::~TraceWriterImpl() {
57   if (cur_chunk_.is_valid()) {
58     cur_packet_->Finalize();
59     Flush();
60   }
61   shmem_arbiter_->ReleaseWriterID(id_);
62 }
63 
Flush(std::function<void ()> callback)64 void TraceWriterImpl::Flush(std::function<void()> callback) {
65   // Flush() cannot be called in the middle of a TracePacket.
66   PERFETTO_CHECK(cur_packet_->is_finalized());
67 
68   if (cur_chunk_.is_valid()) {
69     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
70                                          &patch_list_);
71   } else {
72     PERFETTO_DCHECK(patch_list_.empty());
73   }
74   // Always issue the Flush request, even if there is nothing to flush, just
75   // for the sake of getting the callback posted back.
76   shmem_arbiter_->FlushPendingCommitDataRequests(callback);
77   protobuf_stream_writer_.Reset({nullptr, nullptr});
78 }
79 
NewTracePacket()80 TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
81   // If we hit this, the caller is calling NewTracePacket() without having
82   // finalized the previous packet.
83   PERFETTO_DCHECK(cur_packet_->is_finalized());
84 
85   fragmenting_packet_ = false;
86 
87   // Reserve space for the size of the message. Note: this call might re-enter
88   // into this class invoking GetNewBuffer() if there isn't enough space or if
89   // this is the very first call to NewTracePacket().
90   static_assert(kPacketHeaderSize == kMessageLengthFieldSize,
91                 "The packet header must match the Message header size");
92 
93   // It doesn't make sense to begin a packet that is going to fragment
94   // immediately after (8 is just an arbitrary estimation on the minimum size of
95   // a realistic packet).
96   bool chunk_too_full =
97       protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8;
98   if (chunk_too_full || reached_max_packets_per_chunk_) {
99     protobuf_stream_writer_.Reset(GetNewBuffer());
100   }
101 
102   // Send any completed patches to the service to facilitate trace data
103   // recovery by the service. This should only happen when we're completing
104   // the first packet in a chunk which was a continuation from the previous
105   // chunk, i.e. at most once per chunk.
106   if (!patch_list_.empty() && patch_list_.front().is_patched()) {
107     shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
108   }
109 
110   cur_packet_->Reset(&protobuf_stream_writer_);
111   uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
112   memset(header, 0, kPacketHeaderSize);
113   cur_packet_->set_size_field(header);
114   uint16_t new_packet_count = cur_chunk_.IncrementPacketCount();
115   reached_max_packets_per_chunk_ =
116       new_packet_count == ChunkHeader::Packets::kMaxCount;
117   TracePacketHandle handle(cur_packet_.get());
118   cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
119   fragmenting_packet_ = true;
120   return handle;
121 }
122 
123 // Called by the Message. We can get here in two cases:
124 // 1. In the middle of writing a Message,
125 // when |fragmenting_packet_| == true. In this case we want to update the
126 // chunk header with a partial packet and start a new partial packet in the
127 // new chunk.
128 // 2. While calling ReserveBytes() for the packet header in NewTracePacket().
129 // In this case |fragmenting_packet_| == false and we just want a new chunk
130 // without creating any fragments.
GetNewBuffer()131 protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
132   if (fragmenting_packet_) {
133     uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
134     PERFETTO_DCHECK(wptr >= cur_fragment_start_);
135     uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
136     PERFETTO_DCHECK(partial_size < cur_chunk_.size());
137 
138     // Backfill the packet header with the fragment size.
139     PERFETTO_DCHECK(partial_size > 0);
140     cur_packet_->inc_size_already_written(partial_size);
141     cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
142     WriteRedundantVarInt(partial_size, cur_packet_->size_field());
143 
144     // Descend in the stack of non-finalized nested submessages (if any) and
145     // detour their |size_field| into the |patch_list_|. At this point we have
146     // to release the chunk and they cannot write anymore into that.
147     // TODO(primiano): add tests to cover this logic.
148     bool chunk_needs_patching = false;
149     for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
150          nested_msg = nested_msg->nested_message()) {
151       uint8_t* const cur_hdr = nested_msg->size_field();
152 
153       // If this is false the protozero Message has already been instructed to
154       // write, upon Finalize(), its size into the patch list.
155       bool size_field_points_within_chunk =
156           cur_hdr >= cur_chunk_.payload_begin() &&
157           cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
158 
159       if (size_field_points_within_chunk) {
160         auto offset =
161             static_cast<uint16_t>(cur_hdr - cur_chunk_.payload_begin());
162         const ChunkID cur_chunk_id =
163             cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed);
164         Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset);
165         nested_msg->set_size_field(&patch->size_field[0]);
166         chunk_needs_patching = true;
167       } else {
168 #if PERFETTO_DCHECK_IS_ON()
169         // Ensure that the size field of the message points to an element of the
170         // patch list.
171         auto patch_it = std::find_if(
172             patch_list_.begin(), patch_list_.end(),
173             [cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; });
174         PERFETTO_DCHECK(patch_it != patch_list_.end());
175 #endif
176       }
177     }  // for(nested_msg
178 
179     if (chunk_needs_patching)
180       cur_chunk_.SetFlag(ChunkHeader::kChunkNeedsPatching);
181   }  // if(fragmenting_packet)
182 
183   if (cur_chunk_.is_valid()) {
184     // ReturnCompletedChunk will consume the first patched entries from
185     // |patch_list_| and shrink it.
186     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
187                                          &patch_list_);
188   }
189 
190   // Start a new chunk.
191 
192   ChunkHeader::Packets packets = {};
193   if (fragmenting_packet_) {
194     packets.count = 1;
195     packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
196   }
197 
198   // The memory order of the stores below doesn't really matter. This |header|
199   // is just a local temporary object. The GetNewChunk() call below will copy it
200   // into the shared buffer with the proper barriers.
201   ChunkHeader header = {};
202   header.writer_id.store(id_, std::memory_order_relaxed);
203   header.chunk_id.store(next_chunk_id_++, std::memory_order_relaxed);
204   header.packets.store(packets, std::memory_order_relaxed);
205 
206   cur_chunk_ = shmem_arbiter_->GetNewChunk(header);
207   reached_max_packets_per_chunk_ = false;
208   uint8_t* payload_begin = cur_chunk_.payload_begin();
209   if (fragmenting_packet_) {
210     cur_packet_->set_size_field(payload_begin);
211     memset(payload_begin, 0, kPacketHeaderSize);
212     payload_begin += kPacketHeaderSize;
213     cur_fragment_start_ = payload_begin;
214   }
215 
216   return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
217 }
218 
writer_id() const219 WriterID TraceWriterImpl::writer_id() const {
220   return id_;
221 }
222 
SetFirstChunkId(ChunkID chunk_id)223 bool TraceWriterImpl::SetFirstChunkId(ChunkID chunk_id) {
224   if (next_chunk_id_ > 0)
225     return false;
226   next_chunk_id_ = chunk_id;
227   return true;
228 }
229 
230 // Base class definitions.
231 TraceWriter::TraceWriter() = default;
232 TraceWriter::~TraceWriter() = default;
233 
SetFirstChunkId(ChunkID)234 bool TraceWriter::SetFirstChunkId(ChunkID) {
235   return false;
236 }
237 
238 }  // namespace perfetto
239