1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/rpc/proto_ring_buffer.h"
18 
19 #include "perfetto/base/logging.h"
20 #include "perfetto/ext/base/paged_memory.h"
21 #include "perfetto/protozero/proto_utils.h"
22 
23 namespace perfetto {
24 namespace trace_processor {
25 
26 namespace {
27 constexpr size_t kGrowBytes = 128 * 1024;
28 
FramingError()29 inline ProtoRingBuffer::Message FramingError() {
30   ProtoRingBuffer::Message msg{};
31   msg.fatal_framing_error = true;
32   return msg;
33 }
34 
35 // Tries to decode a length-delimited proto field from |start|.
36 // Returns a valid boundary if the preamble is valid and the length is within
37 // |end|, or an invalid message otherwise.
TryReadMessage(const uint8_t * start,const uint8_t * end)38 ProtoRingBuffer::Message TryReadMessage(const uint8_t* start,
39                                         const uint8_t* end) {
40   namespace proto_utils = protozero::proto_utils;
41   uint64_t field_tag = 0;
42   auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag);
43   if (start_of_len == start)
44     return ProtoRingBuffer::Message{};  // Not enough data.
45 
46   const uint32_t tag = field_tag & 0x07;
47   if (tag !=
48       static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) {
49     PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag);
50     return FramingError();
51   }
52 
53   uint64_t msg_len = 0;
54   auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len);
55   if (start_of_msg == start_of_len)
56     return ProtoRingBuffer::Message{};  // Not enough data.
57 
58   if (msg_len > ProtoRingBuffer::kMaxMsgSize) {
59     PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)",
60                   msg_len, ProtoRingBuffer::kMaxMsgSize);
61     return FramingError();
62   }
63 
64   if (start_of_msg + msg_len > end)
65     return ProtoRingBuffer::Message{};  // Not enough data.
66 
67   ProtoRingBuffer::Message msg{};
68   msg.start = start_of_msg;
69   msg.len = static_cast<uint32_t>(msg_len);
70   msg.field_id = static_cast<uint32_t>(field_tag >> 3);
71   return msg;
72 }
73 
74 }  // namespace
75 
ProtoRingBuffer()76 ProtoRingBuffer::ProtoRingBuffer()
77     : buf_(base::PagedMemory::Allocate(kGrowBytes)) {}
78 ProtoRingBuffer::~ProtoRingBuffer() = default;
79 
Append(const void * data_void,size_t data_len)80 void ProtoRingBuffer::Append(const void* data_void, size_t data_len) {
81   if (failed_)
82     return;
83   const uint8_t* data = static_cast<const uint8_t*>(data_void);
84   PERFETTO_DCHECK(wr_ <= buf_.size());
85   PERFETTO_DCHECK(wr_ >= rd_);
86 
87   // If the last call to ReadMessage() consumed all the data in the buffer and
88   // there are no incomplete messages pending, restart from the beginning rather
89   // than keep ringing. This is the most common case.
90   if (rd_ == wr_)
91     rd_ = wr_ = 0;
92 
93   // The caller is expected to always issue a ReadMessage() after each Append().
94   PERFETTO_CHECK(!fastpath_.valid());
95   if (rd_ == wr_) {
96     auto msg = TryReadMessage(data, data + data_len);
97     if (msg.valid() && msg.end() == (data + data_len)) {
98       // Fastpath: in many cases, the underlying stream will effectively
99       // preserve the atomicity of messages for most small messages.
100       // In this case we can avoid the extra buf_ roundtrip and just pass a
101       // pointer to |data| + (proto preamble len).
102       // The next call to ReadMessage)= will return |fastpath_|.
103       fastpath_ = std::move(msg);
104       return;
105     }
106   }
107 
108   size_t avail = buf_.size() - wr_;
109   if (data_len > avail) {
110     // This whole section should be hit extremely rare.
111 
112     // Try first just recompacting the buffer by moving everything to the left.
113     // This can happen if we received "a message and a bit" on each Append call
114     // so we ended pup in a situation like:
115     // buf_: [unused space] [msg1 incomplete]
116     //                      ^rd_             ^wr_
117     //
118     // After recompaction:
119     // buf_: [msg1 incomplete]
120     //       ^rd_             ^wr_
121     uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
122     memmove(&buf[0], &buf[rd_], wr_ - rd_);
123     avail += rd_;
124     wr_ -= rd_;
125     rd_ = 0;
126     if (data_len > avail) {
127       // The compaction didn't free up enough space and we need to expand the
128       // ring buffer. Yes, we could have detected this earlier and split the
129       // code paths, rather than first compacting and then realizing it wasn't
130       // sufficient. However, that would make the code harder to reason about,
131       // creating code paths that are nearly never hit, hence making it more
132       // likely to accumulate bugs in future. All this is very rare.
133       size_t new_size = buf_.size();
134       while (data_len > new_size - wr_)
135         new_size += kGrowBytes;
136       if (new_size > kMaxMsgSize * 2) {
137         failed_ = true;
138         return;
139       }
140       auto new_buf = base::PagedMemory::Allocate(new_size);
141       memcpy(new_buf.Get(), buf_.Get(), buf_.size());
142       buf_ = std::move(new_buf);
143       avail = new_size - wr_;
144       // No need to touch rd_ / wr_ cursors.
145     }
146   }
147 
148   // Append the received data at the end of the ring buffer.
149   uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
150   memcpy(&buf[wr_], data, data_len);
151   wr_ += data_len;
152 }
153 
ReadMessage()154 ProtoRingBuffer::Message ProtoRingBuffer::ReadMessage() {
155   if (failed_)
156     return FramingError();
157 
158   if (fastpath_.valid()) {
159     // The fastpath can only be hit when the buffer is empty.
160     PERFETTO_CHECK(rd_ == wr_);
161     auto msg = std::move(fastpath_);
162     fastpath_ = Message{};
163     return msg;
164   }
165 
166   uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
167 
168   PERFETTO_DCHECK(rd_ <= wr_);
169   if (rd_ >= wr_)
170     return Message{};  // Completely empty.
171 
172   auto msg = TryReadMessage(&buf[rd_], &buf[wr_]);
173   if (!msg.valid()) {
174     failed_ = failed_ || msg.fatal_framing_error;
175     return msg;  // Return |msg| because it could be a framing error.
176   }
177 
178   // Note: msg.start is > buf[rd_], because it skips the proto preamble.
179   PERFETTO_DCHECK(msg.start > &buf[rd_]);
180   const uint8_t* msg_end = msg.start + msg.len;
181   PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]);
182   auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]);
183   rd_ += msg_outer_len;
184   return msg;
185 }
186 
187 }  // namespace trace_processor
188 }  // namespace perfetto
189