1 /*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/tracing/core/trace_writer_impl.h"
18
19 #include <string.h>
20
21 #include <algorithm>
22 #include <type_traits>
23 #include <utility>
24
25 #include "perfetto/base/logging.h"
26 #include "perfetto/ext/base/thread_annotations.h"
27 #include "perfetto/protozero/message.h"
28 #include "perfetto/protozero/proto_utils.h"
29 #include "perfetto/protozero/root_message.h"
30 #include "src/tracing/core/shared_memory_arbiter_impl.h"
31
32 #include "protos/perfetto/trace/trace_packet.pbzero.h"
33
34 using protozero::proto_utils::kMessageLengthFieldSize;
35 using protozero::proto_utils::WriteRedundantVarInt;
36 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
37
38 namespace perfetto {
39
40 namespace {
41 constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
42 uint8_t g_garbage_chunk[1024];
43 } // namespace
44
TraceWriterImpl(SharedMemoryArbiterImpl * shmem_arbiter,WriterID id,MaybeUnboundBufferID target_buffer,BufferExhaustedPolicy buffer_exhausted_policy)45 TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter,
46 WriterID id,
47 MaybeUnboundBufferID target_buffer,
48 BufferExhaustedPolicy buffer_exhausted_policy)
49 : shmem_arbiter_(shmem_arbiter),
50 id_(id),
51 target_buffer_(target_buffer),
52 buffer_exhausted_policy_(buffer_exhausted_policy),
53 protobuf_stream_writer_(this),
54 process_id_(base::GetProcessId()) {
55 // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
56 // more gracefully and always return a no-op TracePacket in NewTracePacket().
57 PERFETTO_CHECK(id_ != 0);
58
59 cur_packet_.reset(new protozero::RootMessage<protos::pbzero::TracePacket>());
60 cur_packet_->Finalize(); // To avoid the DCHECK in NewTracePacket().
61 }
62
~TraceWriterImpl()63 TraceWriterImpl::~TraceWriterImpl() {
64 if (cur_chunk_.is_valid()) {
65 cur_packet_->Finalize();
66 Flush();
67 }
68 // This call may cause the shared memory arbiter (and the underlying memory)
69 // to get asynchronously deleted if this was the last trace writer targeting
70 // the arbiter and the arbiter was marked for shutdown.
71 shmem_arbiter_->ReleaseWriterID(id_);
72 }
73
Flush(std::function<void ()> callback)74 void TraceWriterImpl::Flush(std::function<void()> callback) {
75 // Flush() cannot be called in the middle of a TracePacket.
76 PERFETTO_CHECK(cur_packet_->is_finalized());
77
78 if (cur_chunk_.is_valid()) {
79 shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
80 &patch_list_);
81 } else {
82 // When in stall mode, all patches should have been returned with the last
83 // chunk, since the last packet was completed. In drop_packets_ mode, this
84 // may not be the case because the packet may have been fragmenting when
85 // SMB exhaustion occurred and |cur_chunk_| became invalid. In this case,
86 // drop_packets_ should be true.
87 PERFETTO_DCHECK(patch_list_.empty() || drop_packets_);
88 }
89
90 // Always issue the Flush request, even if there is nothing to flush, just
91 // for the sake of getting the callback posted back.
92 shmem_arbiter_->FlushPendingCommitDataRequests(callback);
93 protobuf_stream_writer_.Reset({nullptr, nullptr});
94
95 // |last_packet_size_field_| might have pointed into the chunk we returned.
96 last_packet_size_field_ = nullptr;
97 }
98
NewTracePacket()99 TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
100 // If we hit this, the caller is calling NewTracePacket() without having
101 // finalized the previous packet.
102 PERFETTO_CHECK(cur_packet_->is_finalized());
103 // If we hit this, this trace writer was created in a different process. This
104 // likely means that the process forked while tracing was active, and the
105 // forked child process tried to emit a trace event. This is not supported, as
106 // it would lead to two processes writing to the same tracing SMB.
107 PERFETTO_DCHECK(process_id_ == base::GetProcessId());
108
109 fragmenting_packet_ = false;
110
111 // Reserve space for the size of the message. Note: this call might re-enter
112 // into this class invoking GetNewBuffer() if there isn't enough space or if
113 // this is the very first call to NewTracePacket().
114 static_assert(kPacketHeaderSize == kMessageLengthFieldSize,
115 "The packet header must match the Message header size");
116
117 bool was_dropping_packets = drop_packets_;
118
119 // It doesn't make sense to begin a packet that is going to fragment
120 // immediately after (8 is just an arbitrary estimation on the minimum size of
121 // a realistic packet).
122 bool chunk_too_full =
123 protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8;
124 if (chunk_too_full || reached_max_packets_per_chunk_ ||
125 retry_new_chunk_after_packet_) {
126 protobuf_stream_writer_.Reset(GetNewBuffer());
127 }
128
129 // Send any completed patches to the service to facilitate trace data
130 // recovery by the service. This should only happen when we're completing
131 // the first packet in a chunk which was a continuation from the previous
132 // chunk, i.e. at most once per chunk.
133 if (!patch_list_.empty() && patch_list_.front().is_patched()) {
134 shmem_arbiter_->SendPatches(id_, target_buffer_, &patch_list_);
135 }
136
137 cur_packet_->Reset(&protobuf_stream_writer_);
138 uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
139 memset(header, 0, kPacketHeaderSize);
140 cur_packet_->set_size_field(header);
141 last_packet_size_field_ = header;
142
143 TracePacketHandle handle(cur_packet_.get());
144 cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
145 fragmenting_packet_ = true;
146
147 if (PERFETTO_LIKELY(!drop_packets_)) {
148 uint16_t new_packet_count = cur_chunk_.IncrementPacketCount();
149 reached_max_packets_per_chunk_ =
150 new_packet_count == ChunkHeader::Packets::kMaxCount;
151
152 if (PERFETTO_UNLIKELY(was_dropping_packets)) {
153 // We've succeeded to get a new chunk from the SMB after we entered
154 // drop_packets_ mode. Record a marker into the new packet to indicate the
155 // data loss.
156 cur_packet_->set_previous_packet_dropped(true);
157 }
158 }
159
160 return handle;
161 }
162
163 // Called by the Message. We can get here in two cases:
164 // 1. In the middle of writing a Message,
165 // when |fragmenting_packet_| == true. In this case we want to update the
166 // chunk header with a partial packet and start a new partial packet in the
167 // new chunk.
168 // 2. While calling ReserveBytes() for the packet header in NewTracePacket().
169 // In this case |fragmenting_packet_| == false and we just want a new chunk
170 // without creating any fragments.
GetNewBuffer()171 protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
172 if (fragmenting_packet_ && drop_packets_) {
173 // We can't write the remaining data of the fragmenting packet to a new
174 // chunk, because we have already lost some of its data in the garbage
175 // chunk. Thus, we will wrap around in the garbage chunk, wait until the
176 // current packet was completed, and then attempt to get a new chunk from
177 // the SMB again. Instead, if |drop_packets_| is true and
178 // |fragmenting_packet_| is false, we try to acquire a valid chunk because
179 // the SMB exhaustion might be resolved.
180 retry_new_chunk_after_packet_ = true;
181 return protozero::ContiguousMemoryRange{
182 &g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
183 }
184
185 // Attempt to grab the next chunk before finalizing the current one, so that
186 // we know whether we need to start dropping packets before writing the
187 // current packet fragment's header.
188 ChunkHeader::Packets packets = {};
189 if (fragmenting_packet_) {
190 packets.count = 1;
191 packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
192 }
193
194 // The memory order of the stores below doesn't really matter. This |header|
195 // is just a local temporary object. The GetNewChunk() call below will copy it
196 // into the shared buffer with the proper barriers.
197 ChunkHeader header = {};
198 header.writer_id.store(id_, std::memory_order_relaxed);
199 header.chunk_id.store(next_chunk_id_, std::memory_order_relaxed);
200 header.packets.store(packets, std::memory_order_relaxed);
201
202 SharedMemoryABI::Chunk new_chunk =
203 shmem_arbiter_->GetNewChunk(header, buffer_exhausted_policy_);
204 if (!new_chunk.is_valid()) {
205 // Shared memory buffer exhausted, switch into |drop_packets_| mode. We'll
206 // drop data until the garbage chunk has been filled once and then retry.
207
208 // If we started a packet in one of the previous (valid) chunks, we need to
209 // tell the service to discard it.
210 if (fragmenting_packet_) {
211 // We can only end up here if the previous chunk was a valid chunk,
212 // because we never try to acquire a new chunk in |drop_packets_| mode
213 // while fragmenting.
214 PERFETTO_DCHECK(!drop_packets_);
215
216 // Backfill the last fragment's header with an invalid size (too large),
217 // so that the service's TraceBuffer throws out the incomplete packet.
218 // It'll restart reading from the next chunk we submit.
219 WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
220 cur_packet_->size_field());
221
222 // Reset the size field, since we should not write the current packet's
223 // size anymore after this.
224 cur_packet_->set_size_field(nullptr);
225
226 // We don't set kLastPacketContinuesOnNextChunk or kChunkNeedsPatching on
227 // the last chunk, because its last fragment will be discarded anyway.
228 // However, the current packet fragment points to a valid |cur_chunk_| and
229 // may have non-finalized nested messages which will continue in the
230 // garbage chunk and currently still point into |cur_chunk_|. As we are
231 // about to return |cur_chunk_|, we need to invalidate the size fields of
232 // those nested messages. Normally we move them in the |patch_list_| (see
233 // below) but in this case, it doesn't make sense to send patches for a
234 // fragment that will be discarded for sure. Thus, we clean up any size
235 // field references into |cur_chunk_|.
236 for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
237 nested_msg = nested_msg->nested_message()) {
238 uint8_t* const cur_hdr = nested_msg->size_field();
239
240 // If this is false the protozero Message has already been instructed to
241 // write, upon Finalize(), its size into the patch list.
242 bool size_field_points_within_chunk =
243 cur_hdr >= cur_chunk_.payload_begin() &&
244 cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
245
246 if (size_field_points_within_chunk)
247 nested_msg->set_size_field(nullptr);
248 }
249 } else if (!drop_packets_ && last_packet_size_field_) {
250 // If we weren't dropping packets before, we should indicate to the
251 // service that we're about to lose data. We do this by invalidating the
252 // size of the last packet in |cur_chunk_|. The service will record
253 // statistics about packets with kPacketSizeDropPacket size.
254 PERFETTO_DCHECK(cur_packet_->is_finalized());
255 PERFETTO_DCHECK(cur_chunk_.is_valid());
256
257 // |last_packet_size_field_| should point within |cur_chunk_|'s payload.
258 PERFETTO_DCHECK(last_packet_size_field_ >= cur_chunk_.payload_begin() &&
259 last_packet_size_field_ + kMessageLengthFieldSize <=
260 cur_chunk_.end());
261
262 WriteRedundantVarInt(SharedMemoryABI::kPacketSizeDropPacket,
263 last_packet_size_field_);
264 }
265
266 if (cur_chunk_.is_valid()) {
267 shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_),
268 target_buffer_, &patch_list_);
269 }
270
271 drop_packets_ = true;
272 cur_chunk_ = SharedMemoryABI::Chunk(); // Reset to an invalid chunk.
273 reached_max_packets_per_chunk_ = false;
274 retry_new_chunk_after_packet_ = false;
275 last_packet_size_field_ = nullptr;
276
277 PERFETTO_ANNOTATE_BENIGN_RACE_SIZED(&g_garbage_chunk,
278 sizeof(g_garbage_chunk),
279 "nobody reads the garbage chunk")
280 return protozero::ContiguousMemoryRange{
281 &g_garbage_chunk[0], &g_garbage_chunk[0] + sizeof(g_garbage_chunk)};
282 } // if (!new_chunk.is_valid())
283
284 PERFETTO_DCHECK(new_chunk.is_valid());
285
286 if (fragmenting_packet_) {
287 // We should not be fragmenting a packet after we exited drop_packets_ mode,
288 // because we only retry to get a new chunk when a fresh packet is started.
289 PERFETTO_DCHECK(!drop_packets_);
290
291 uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
292 PERFETTO_DCHECK(wptr >= cur_fragment_start_);
293 uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
294 PERFETTO_DCHECK(partial_size < cur_chunk_.size());
295
296 // Backfill the packet header with the fragment size.
297 PERFETTO_DCHECK(partial_size > 0);
298 cur_packet_->inc_size_already_written(partial_size);
299 cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
300 WriteRedundantVarInt(partial_size, cur_packet_->size_field());
301
302 // Descend in the stack of non-finalized nested submessages (if any) and
303 // detour their |size_field| into the |patch_list_|. At this point we have
304 // to release the chunk and they cannot write anymore into that.
305 // TODO(primiano): add tests to cover this logic.
306 bool chunk_needs_patching = false;
307 for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
308 nested_msg = nested_msg->nested_message()) {
309 uint8_t* const cur_hdr = nested_msg->size_field();
310
311 // If this is false the protozero Message has already been instructed to
312 // write, upon Finalize(), its size into the patch list.
313 bool size_field_points_within_chunk =
314 cur_hdr >= cur_chunk_.payload_begin() &&
315 cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
316
317 if (size_field_points_within_chunk) {
318 auto offset =
319 static_cast<uint16_t>(cur_hdr - cur_chunk_.payload_begin());
320 const ChunkID cur_chunk_id =
321 cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed);
322 Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset);
323 nested_msg->set_size_field(&patch->size_field[0]);
324 chunk_needs_patching = true;
325 } else {
326 #if PERFETTO_DCHECK_IS_ON()
327 // Ensure that the size field of the message points to an element of the
328 // patch list.
329 auto patch_it = std::find_if(
330 patch_list_.begin(), patch_list_.end(),
331 [cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; });
332 PERFETTO_DCHECK(patch_it != patch_list_.end());
333 #endif
334 }
335 } // for(nested_msg
336
337 if (chunk_needs_patching)
338 cur_chunk_.SetFlag(ChunkHeader::kChunkNeedsPatching);
339 } // if(fragmenting_packet)
340
341 if (cur_chunk_.is_valid()) {
342 // ReturnCompletedChunk will consume the first patched entries from
343 // |patch_list_| and shrink it.
344 shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
345 &patch_list_);
346 }
347
348 // Switch to the new chunk.
349 drop_packets_ = false;
350 reached_max_packets_per_chunk_ = false;
351 retry_new_chunk_after_packet_ = false;
352 next_chunk_id_++;
353 cur_chunk_ = std::move(new_chunk);
354 last_packet_size_field_ = nullptr;
355
356 uint8_t* payload_begin = cur_chunk_.payload_begin();
357 if (fragmenting_packet_) {
358 cur_packet_->set_size_field(payload_begin);
359 last_packet_size_field_ = payload_begin;
360 memset(payload_begin, 0, kPacketHeaderSize);
361 payload_begin += kPacketHeaderSize;
362 cur_fragment_start_ = payload_begin;
363 }
364
365 return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
366 }
367
writer_id() const368 WriterID TraceWriterImpl::writer_id() const {
369 return id_;
370 }
371
372 // Base class definitions.
373 TraceWriter::TraceWriter() = default;
374 TraceWriter::~TraceWriter() = default;
375
376 } // namespace perfetto
377