1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/perfetto_cmd/packet_writer.h"
18 
19 #include <array>
20 
21 #include <fcntl.h>
22 #include <signal.h>
23 #include <stdio.h>
24 #include <sys/stat.h>
25 
26 #include "perfetto/base/build_config.h"
27 #include "perfetto/ext/base/getopt.h"
28 #include "perfetto/ext/base/paged_memory.h"
29 #include "perfetto/ext/base/utils.h"
30 #include "perfetto/ext/tracing/core/trace_packet.h"
31 #include "perfetto/protozero/proto_utils.h"
32 
33 #if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
34 #include <zlib.h>
35 #endif
36 
37 namespace perfetto {
38 namespace {
39 
40 using protozero::proto_utils::MakeTagLengthDelimited;
41 using protozero::proto_utils::WriteVarInt;
42 using Preamble = std::array<char, 16>;
43 
44 // ID of the |packet| field in trace.proto. Hardcoded as this we don't
45 // want to depend on protos/trace:lite for binary size saving reasons.
46 constexpr uint32_t kPacketId = 1;
47 
48 #if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
49 
50 // ID of |compressed_packets| in trace_packet.proto.
51 constexpr uint32_t kCompressedPacketsId = 50;
52 
53 // Some transport mechanisms have a 512kb limit on packet size.
54 // ZipPacketWriter respects this limit where possible and does
55 // not produce compressed packets larger than 512kb. This is
56 // constant is deliberately conservative to leave plenty of
57 // room for the transport to add additional headers etc.
58 const size_t kMaxPacketSize = 500 * 1024;
59 
60 // After every kPendingBytesLimit we do a Z_SYNC_FLUSH in the zlib stream.
61 const size_t kPendingBytesLimit = 32 * 1024;
62 
63 #endif  // PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
64 
65 template <uint32_t id>
GetPreamble(size_t sz,Preamble * preamble)66 size_t GetPreamble(size_t sz, Preamble* preamble) {
67   uint8_t* ptr = reinterpret_cast<uint8_t*>(preamble->data());
68   constexpr uint32_t tag = MakeTagLengthDelimited(id);
69   ptr = WriteVarInt(tag, ptr);
70   ptr = WriteVarInt(sz, ptr);
71   size_t preamble_size = reinterpret_cast<uintptr_t>(ptr) -
72                          reinterpret_cast<uintptr_t>(preamble->data());
73   PERFETTO_DCHECK(preamble_size < preamble->size());
74   return preamble_size;
75 }
76 
77 class FilePacketWriter : public PacketWriter {
78  public:
79   FilePacketWriter(FILE* fd);
80   ~FilePacketWriter() override;
81   bool WritePacket(const TracePacket& packet) override;
82 
83  private:
84   FILE* fd_;
85 };
86 
FilePacketWriter(FILE * fd)87 FilePacketWriter::FilePacketWriter(FILE* fd) : fd_(fd) {}
88 
~FilePacketWriter()89 FilePacketWriter::~FilePacketWriter() {
90   fflush(fd_);
91 }
92 
WritePacket(const TracePacket & packet)93 bool FilePacketWriter::WritePacket(const TracePacket& packet) {
94   Preamble preamble;
95   size_t size = GetPreamble<kPacketId>(packet.size(), &preamble);
96   if (fwrite(preamble.data(), 1, size, fd_) != size)
97     return false;
98   for (const Slice& slice : packet.slices()) {
99     if (fwrite(reinterpret_cast<const char*>(slice.start), 1, slice.size,
100                fd_) != slice.size) {
101       return false;
102     }
103   }
104 
105   return true;
106 }
107 
108 #if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
109 
110 class ZipPacketWriter : public PacketWriter {
111  public:
112   ZipPacketWriter(std::unique_ptr<PacketWriter>);
113   ~ZipPacketWriter() override;
114   bool WritePacket(const TracePacket& packet) override;
115 
116  private:
117   void CheckEq(int actual_code, int expected_code);
118   bool FinalizeCompressedPacket();
Deflate(const char * ptr,size_t size)119   inline void Deflate(const char* ptr, size_t size) {
120     return Deflate(reinterpret_cast<const uint8_t*>(ptr), size);
121   }
Deflate(const void * ptr,size_t size)122   inline void Deflate(const void* ptr, size_t size) {
123     return Deflate(reinterpret_cast<const uint8_t*>(ptr), size);
124   }
125   void Deflate(const uint8_t* ptr, size_t size);
126 
127   std::unique_ptr<PacketWriter> writer_;
128   z_stream stream_{};
129 
130   base::PagedMemory buf_;
131   uint8_t* const start_;
132   uint8_t* const end_;
133 
134   bool is_compressing_ = false;
135   size_t pending_bytes_ = 0;
136 };
137 
ZipPacketWriter(std::unique_ptr<PacketWriter> writer)138 ZipPacketWriter::ZipPacketWriter(std::unique_ptr<PacketWriter> writer)
139     : writer_(std::move(writer)),
140       buf_(base::PagedMemory::Allocate(kMaxPacketSize)),
141       start_(static_cast<uint8_t*>(buf_.Get())),
142       end_(start_ + buf_.size()) {}
143 
~ZipPacketWriter()144 ZipPacketWriter::~ZipPacketWriter() {
145   if (is_compressing_)
146     FinalizeCompressedPacket();
147 }
148 
WritePacket(const TracePacket & packet)149 bool ZipPacketWriter::WritePacket(const TracePacket& packet) {
150   // If we have already written one compressed packet, check whether we should
151   // flush the buffer.
152   if (is_compressing_) {
153     // We have two goals:
154     // - Fit as much data as possible into each packet
155     // - Ensure each packet is under 512KB
156     // We keep track of two numbers:
157     // - the number of remaining bytes in the output buffer
158     // - the number of (pending) uncompressed bytes written since the last flush
159     // The pending bytes may or may not have appeared in output buffer.
160     // Assuming in the worst case each uncompressed input byte can turn into
161     // two compressed bytes we can ensure we don't go over 512KB by not letting
162     // the number of pending bytes go over remaining bytes/2 - however often
163     // each input byte will not turn into 2 output bytes but less than 1 output
164     // byte - so this underfills the packet. To avoid this every 32kb we deflate
165     // with Z_SYNC_FLUSH ensuring all pending bytes are present in the output
166     // buffer.
167     if (pending_bytes_ > kPendingBytesLimit) {
168       CheckEq(deflate(&stream_, Z_SYNC_FLUSH), Z_OK);
169       pending_bytes_ = 0;
170     }
171 
172     PERFETTO_DCHECK(end_ >= stream_.next_out);
173     size_t remaining = static_cast<size_t>(end_ - stream_.next_out);
174     if ((pending_bytes_ + packet.size() + 1024) * 2 > remaining) {
175       if (!FinalizeCompressedPacket()) {
176         return false;
177       }
178     }
179   }
180 
181   // Do not attempt to compress large packets (since they may overflow our
182   // output buffer) instead insert them directly into the output stream:
183   if (packet.size() > kMaxPacketSize) {
184     // We can't be compressing here, if we were we would have attempted
185     // to FinalizeCompressedPacket() above:
186     PERFETTO_DCHECK(!is_compressing_);
187     return writer_->WritePacket(packet);
188   }
189 
190   // Reinitialize the compresser if needed:
191   if (!is_compressing_) {
192     memset(&stream_, 0, sizeof(stream_));
193     CheckEq(deflateInit(&stream_, 6), Z_OK);
194     is_compressing_ = true;
195     stream_.next_out = start_;
196     stream_.avail_out = static_cast<unsigned int>(end_ - start_);
197   }
198 
199   // Compress the trace packet header:
200   Preamble packet_hdr;
201   size_t packet_hdr_size = GetPreamble<kPacketId>(packet.size(), &packet_hdr);
202   Deflate(packet_hdr.data(), packet_hdr_size);
203 
204   // Compress the trace packet itself:
205   for (const Slice& slice : packet.slices()) {
206     Deflate(slice.start, slice.size);
207   }
208 
209   return true;
210 }
211 
FinalizeCompressedPacket()212 bool ZipPacketWriter::FinalizeCompressedPacket() {
213   PERFETTO_DCHECK(is_compressing_);
214 
215   CheckEq(deflate(&stream_, Z_FINISH), Z_STREAM_END);
216 
217   size_t size = static_cast<size_t>(stream_.next_out - start_);
218   Preamble preamble;
219   size_t preamble_size = GetPreamble<kCompressedPacketsId>(size, &preamble);
220 
221   std::vector<TracePacket> out_packets(1);
222   TracePacket& out_packet = out_packets[0];
223   out_packet.AddSlice(preamble.data(), preamble_size);
224   out_packet.AddSlice(start_, size);
225 
226   if (!writer_->WritePackets(out_packets))
227     return false;
228 
229   is_compressing_ = false;
230   pending_bytes_ = 0;
231   CheckEq(deflateEnd(&stream_), Z_OK);
232   return true;
233 }
234 
CheckEq(int actual_code,int expected_code)235 void ZipPacketWriter::CheckEq(int actual_code, int expected_code) {
236   if (actual_code == expected_code)
237     return;
238   PERFETTO_FATAL("Expected %d got %d: %s", actual_code, expected_code,
239                  stream_.msg);
240 }
241 
Deflate(const uint8_t * ptr,size_t size)242 void ZipPacketWriter::Deflate(const uint8_t* ptr, size_t size) {
243   PERFETTO_CHECK(is_compressing_);
244   stream_.next_in = const_cast<uint8_t*>(ptr);
245   stream_.avail_in = static_cast<unsigned int>(size);
246   CheckEq(deflate(&stream_, Z_NO_FLUSH), Z_OK);
247   PERFETTO_CHECK(stream_.avail_in == 0);
248   pending_bytes_ += size;
249 }
250 
251 #endif  // PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
252 
253 }  // namespace
254 
PacketWriter()255 PacketWriter::PacketWriter() {}
256 
~PacketWriter()257 PacketWriter::~PacketWriter() {}
258 
CreateFilePacketWriter(FILE * fd)259 std::unique_ptr<PacketWriter> CreateFilePacketWriter(FILE* fd) {
260   return std::unique_ptr<PacketWriter>(new FilePacketWriter(fd));
261 }
262 
263 #if PERFETTO_BUILDFLAG(PERFETTO_ZLIB)
CreateZipPacketWriter(std::unique_ptr<PacketWriter> writer)264 std::unique_ptr<PacketWriter> CreateZipPacketWriter(
265     std::unique_ptr<PacketWriter> writer) {
266   return std::unique_ptr<PacketWriter>(new ZipPacketWriter(std::move(writer)));
267 }
268 #endif
269 
270 }  // namespace perfetto
271