1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
18 #define SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
19 
20 #include "perfetto/base/optional.h"
21 #include "perfetto/base/unix_socket.h"
22 #include "perfetto/base/utils.h"
23 #include "src/profiling/memory/scoped_spinlock.h"
24 
25 #include <atomic>
26 #include <map>
27 #include <memory>
28 
29 #include <stdint.h>
30 
31 namespace perfetto {
32 namespace profiling {
33 
34 // A concurrent, multi-writer single-reader ring buffer FIFO, based on a
35 // circular buffer over shared memory. It has similar semantics to a SEQ_PACKET
36 // + O_NONBLOCK socket, specifically:
37 //
38 // - Writes are atomic, data is either written fully in the buffer or not.
39 // - New writes are discarded if the buffer is full.
40 // - If a write succeeds, the reader is guaranteed to see the whole buffer.
41 // - Reads are atomic, no fragmentation.
42 // - The reader sees writes in write order (% discarding).
43 //
44 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
45 // *IMPORTANT*: The ring buffer must be written under the assumption that the
46 // other end modifies arbitrary shared memory without holding the spin-lock.
47 // This means we must make local copies of read and write pointers for doing
48 // bounds checks followed by reads / writes, as they might change in the
49 // meantime.
50 // !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
51 //
52 // TODO:
53 // - Write a benchmark.
54 class SharedRingBuffer {
55  public:
56   class Buffer {
57    public:
Buffer()58     Buffer() {}
Buffer(uint8_t * d,size_t s)59     Buffer(uint8_t* d, size_t s) : data(d), size(s) {}
60 
61     Buffer(const Buffer&) = delete;
62     Buffer& operator=(const Buffer&) = delete;
63 
64     Buffer(Buffer&&) = default;
65     Buffer& operator=(Buffer&&) = default;
66 
67     operator bool() const { return data != nullptr; }
68 
69     uint8_t* data = nullptr;
70     size_t size = 0;
71   };
72 
73   struct Stats {
74     uint64_t bytes_written;
75     uint64_t num_writes_succeeded;
76     uint64_t num_writes_corrupt;
77     uint64_t num_writes_overflow;
78 
79     uint64_t num_reads_succeeded;
80     uint64_t num_reads_corrupt;
81     uint64_t num_reads_nodata;
82 
83     // Fields below get set by GetStats as copies of atomics in MetadataPage.
84     uint64_t failed_spinlocks;
85   };
86 
87   static base::Optional<SharedRingBuffer> Create(size_t);
88   static base::Optional<SharedRingBuffer> Attach(base::ScopedFile);
89 
90   ~SharedRingBuffer();
91   SharedRingBuffer() = default;
92 
93   SharedRingBuffer(SharedRingBuffer&&) noexcept;
94   SharedRingBuffer& operator=(SharedRingBuffer&&);
95 
is_valid()96   bool is_valid() const { return !!mem_; }
size()97   size_t size() const { return size_; }
fd()98   int fd() const { return *mem_fd_; }
99 
100   Buffer BeginWrite(const ScopedSpinlock& spinlock, size_t size);
101   void EndWrite(Buffer buf);
102 
103   Buffer BeginRead();
104   void EndRead(Buffer);
105 
GetStats(ScopedSpinlock & spinlock)106   Stats GetStats(ScopedSpinlock& spinlock) {
107     PERFETTO_DCHECK(spinlock.locked());
108     Stats stats = meta_->stats;
109     stats.failed_spinlocks =
110         meta_->failed_spinlocks.load(std::memory_order_relaxed);
111     return stats;
112   }
113 
114   // This is used by the caller to be able to hold the SpinLock after
115   // BeginWrite has returned. This is so that additional bookkeeping can be
116   // done under the lock. This will be used to increment the sequence_number.
AcquireLock(ScopedSpinlock::Mode mode)117   ScopedSpinlock AcquireLock(ScopedSpinlock::Mode mode) {
118     auto lock = ScopedSpinlock(&meta_->spinlock, mode);
119     if (PERFETTO_UNLIKELY(!lock.locked()))
120       meta_->failed_spinlocks.fetch_add(1, std::memory_order_relaxed);
121     return lock;
122   }
123 
124   // Exposed for fuzzers.
125   struct MetadataPage {
126     alignas(uint64_t) std::atomic<bool> spinlock;
127     uint64_t read_pos;
128     uint64_t write_pos;
129 
130     std::atomic<uint64_t> failed_spinlocks;
131     Stats stats;
132   };
133 
134  private:
135   struct PointerPositions {
136     uint64_t read_pos;
137     uint64_t write_pos;
138   };
139 
140   struct CreateFlag {};
141   struct AttachFlag {};
142   SharedRingBuffer(const SharedRingBuffer&) = delete;
143   SharedRingBuffer& operator=(const SharedRingBuffer&) = delete;
144   SharedRingBuffer(CreateFlag, size_t size);
SharedRingBuffer(AttachFlag,base::ScopedFile mem_fd)145   SharedRingBuffer(AttachFlag, base::ScopedFile mem_fd) {
146     Initialize(std::move(mem_fd));
147   }
148 
149   void Initialize(base::ScopedFile mem_fd);
150   bool IsCorrupt(const PointerPositions& pos);
151 
GetPointerPositions(const ScopedSpinlock & lock)152   inline base::Optional<PointerPositions> GetPointerPositions(
153       const ScopedSpinlock& lock) {
154     PERFETTO_DCHECK(lock.locked());
155 
156     PointerPositions pos;
157     pos.read_pos = meta_->read_pos;
158     pos.write_pos = meta_->write_pos;
159 
160     base::Optional<PointerPositions> result;
161     if (IsCorrupt(pos))
162       return result;
163     result = pos;
164     return result;
165   }
166 
read_avail(const PointerPositions & pos)167   inline size_t read_avail(const PointerPositions& pos) {
168     PERFETTO_DCHECK(pos.write_pos >= pos.read_pos);
169     auto res = static_cast<size_t>(pos.write_pos - pos.read_pos);
170     PERFETTO_DCHECK(res <= size_);
171     return res;
172   }
173 
write_avail(const PointerPositions & pos)174   inline size_t write_avail(const PointerPositions& pos) {
175     return size_ - read_avail(pos);
176   }
177 
at(uint64_t pos)178   inline uint8_t* at(uint64_t pos) { return mem_ + (pos & (size_ - 1)); }
179 
180   base::ScopedFile mem_fd_;
181   MetadataPage* meta_ = nullptr;  // Start of the mmaped region.
182   uint8_t* mem_ = nullptr;  // Start of the contents (i.e. meta_ + kPageSize).
183 
184   // Size of the ring buffer contents, without including metadata or the 2nd
185   // mmap.
186   size_t size_ = 0;
187 
188   // Remember to update the move ctor when adding new fields.
189 };
190 
191 }  // namespace profiling
192 }  // namespace perfetto
193 
194 #endif  // SRC_PROFILING_MEMORY_SHARED_RING_BUFFER_H_
195