1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/ipc/buffered_frame_deserializer.h"
18 
19 #include <inttypes.h>
20 
21 #include <algorithm>
22 #include <type_traits>
23 #include <utility>
24 
25 #include "perfetto/base/logging.h"
26 #include "perfetto/ext/base/utils.h"
27 
28 #include "protos/perfetto/ipc/wire_protocol.gen.h"
29 
30 namespace perfetto {
31 namespace ipc {
32 
33 namespace {
34 
35 // The header is just the number of bytes of the Frame protobuf message.
36 constexpr size_t kHeaderSize = sizeof(uint32_t);
37 }  // namespace
38 
BufferedFrameDeserializer(size_t max_capacity)39 BufferedFrameDeserializer::BufferedFrameDeserializer(size_t max_capacity)
40     : capacity_(max_capacity) {
41   PERFETTO_CHECK(max_capacity % base::GetSysPageSize() == 0);
42   PERFETTO_CHECK(max_capacity >= base::GetSysPageSize());
43 }
44 
45 BufferedFrameDeserializer::~BufferedFrameDeserializer() = default;
46 
47 BufferedFrameDeserializer::ReceiveBuffer
BeginReceive()48 BufferedFrameDeserializer::BeginReceive() {
49   // Upon the first recv initialize the buffer to the max message size but
50   // release the physical memory for all but the first page. The kernel will
51   // automatically give us physical pages back as soon as we page-fault on them.
52   if (!buf_.IsValid()) {
53     PERFETTO_DCHECK(size_ == 0);
54     // TODO(eseckler): Don't commit all of the buffer at once on Windows.
55     buf_ = base::PagedMemory::Allocate(capacity_);
56 
57     // Surely we are going to use at least the first page, but we may not need
58     // the rest for a bit.
59     const auto page_size = base::GetSysPageSize();
60     buf_.AdviseDontNeed(buf() + page_size, capacity_ - page_size);
61   }
62 
63   PERFETTO_CHECK(capacity_ > size_);
64   return ReceiveBuffer{buf() + size_, capacity_ - size_};
65 }
66 
EndReceive(size_t recv_size)67 bool BufferedFrameDeserializer::EndReceive(size_t recv_size) {
68   const auto page_size = base::GetSysPageSize();
69   PERFETTO_CHECK(recv_size + size_ <= capacity_);
70   size_ += recv_size;
71 
72   // At this point the contents buf_ can contain:
73   // A) Only a fragment of the header (the size of the frame). E.g.,
74   //    03 00 00 (the header is 4 bytes, one is missing).
75   //
76   // B) A header and a part of the frame. E.g.,
77   //     05 00 00 00         11 22 33
78   //    [ header, size=5 ]  [ Partial frame ]
79   //
80   // C) One or more complete header+frame. E.g.,
81   //     05 00 00 00         11 22 33 44 55   03 00 00 00        AA BB CC
82   //    [ header, size=5 ]  [ Whole frame ]  [ header, size=3 ] [ Whole frame ]
83   //
84   // D) Some complete header+frame(s) and a partial header or frame (C + A/B).
85   //
86   // C Is the more likely case and the one we are optimizing for. A, B, D can
87   // happen because of the streaming nature of the socket.
88   // The invariant of this function is that, when it returns, buf_ is either
89   // empty (we drained all the complete frames) or starts with the header of the
90   // next, still incomplete, frame.
91 
92   size_t consumed_size = 0;
93   for (;;) {
94     if (size_ < consumed_size + kHeaderSize)
95       break;  // Case A, not enough data to read even the header.
96 
97     // Read the header into |payload_size|.
98     uint32_t payload_size = 0;
99     const char* rd_ptr = buf() + consumed_size;
100     memcpy(base::AssumeLittleEndian(&payload_size), rd_ptr, kHeaderSize);
101 
102     // Saturate the |payload_size| to prevent overflows. The > capacity_ check
103     // below will abort the parsing.
104     size_t next_frame_size =
105         std::min(static_cast<size_t>(payload_size), capacity_);
106     next_frame_size += kHeaderSize;
107     rd_ptr += kHeaderSize;
108 
109     if (size_ < consumed_size + next_frame_size) {
110       // Case B. We got the header but not the whole frame.
111       if (next_frame_size > capacity_) {
112         // The caller is expected to shut down the socket and give up at this
113         // point. If it doesn't do that and insists going on at some point it
114         // will hit the capacity check in BeginReceive().
115         PERFETTO_LOG("IPC Frame too large (size %zu)", next_frame_size);
116         return false;
117       }
118       break;
119     }
120 
121     // Case C. We got at least one header and whole frame.
122     DecodeFrame(rd_ptr, payload_size);
123     consumed_size += next_frame_size;
124   }
125 
126   PERFETTO_DCHECK(consumed_size <= size_);
127   if (consumed_size > 0) {
128     // Shift out the consumed data from the buffer. In the typical case (C)
129     // there is nothing to shift really, just setting size_ = 0 is enough.
130     // Shifting is only for the (unlikely) case D.
131     size_ -= consumed_size;
132     if (size_ > 0) {
133       // Case D. We consumed some frames but there is a leftover at the end of
134       // the buffer. Shift out the consumed bytes, so that on the next round
135       // |buf_| starts with the header of the next unconsumed frame.
136       const char* move_begin = buf() + consumed_size;
137       PERFETTO_CHECK(move_begin > buf());
138       PERFETTO_CHECK(move_begin + size_ <= buf() + capacity_);
139       memmove(buf(), move_begin, size_);
140     }
141     // If we just finished decoding a large frame that used more than one page,
142     // release the extra memory in the buffer. Large frames should be quite
143     // rare.
144     if (consumed_size > page_size) {
145       size_t size_rounded_up = (size_ / page_size + 1) * page_size;
146       if (size_rounded_up < capacity_) {
147         char* madvise_begin = buf() + size_rounded_up;
148         const size_t madvise_size = capacity_ - size_rounded_up;
149         PERFETTO_CHECK(madvise_begin > buf() + size_);
150         PERFETTO_CHECK(madvise_begin + madvise_size <= buf() + capacity_);
151         buf_.AdviseDontNeed(madvise_begin, madvise_size);
152       }
153     }
154   }
155   // At this point |size_| == 0 for case C, > 0 for cases A, B, D.
156   return true;
157 }
158 
PopNextFrame()159 std::unique_ptr<Frame> BufferedFrameDeserializer::PopNextFrame() {
160   if (decoded_frames_.empty())
161     return nullptr;
162   std::unique_ptr<Frame> frame = std::move(decoded_frames_.front());
163   decoded_frames_.pop_front();
164   return frame;
165 }
166 
DecodeFrame(const char * data,size_t size)167 void BufferedFrameDeserializer::DecodeFrame(const char* data, size_t size) {
168   if (size == 0)
169     return;
170   std::unique_ptr<Frame> frame(new Frame);
171   if (frame->ParseFromArray(data, size))
172     decoded_frames_.push_back(std::move(frame));
173 }
174 
175 // static
Serialize(const Frame & frame)176 std::string BufferedFrameDeserializer::Serialize(const Frame& frame) {
177   std::vector<uint8_t> payload = frame.SerializeAsArray();
178   const uint32_t payload_size = static_cast<uint32_t>(payload.size());
179   std::string buf;
180   buf.resize(kHeaderSize + payload_size);
181   memcpy(&buf[0], base::AssumeLittleEndian(&payload_size), kHeaderSize);
182   memcpy(&buf[kHeaderSize], payload.data(), payload.size());
183   return buf;
184 }
185 
186 }  // namespace ipc
187 }  // namespace perfetto
188