1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
18 #define SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
19 
20 #include <stdint.h>
21 
22 #include <functional>
23 #include <map>
24 #include <memory>
25 #include <mutex>
26 #include <vector>
27 
28 #include "perfetto/ext/base/weak_ptr.h"
29 #include "perfetto/ext/tracing/core/basic_types.h"
30 #include "perfetto/ext/tracing/core/shared_memory_abi.h"
31 #include "perfetto/ext/tracing/core/shared_memory_arbiter.h"
32 #include "perfetto/tracing/core/forward_decls.h"
33 #include "src/tracing/core/id_allocator.h"
34 
35 namespace perfetto {
36 
37 class PatchList;
38 class Patch;
39 class TraceWriter;
40 class TraceWriterImpl;
41 
42 namespace base {
43 class TaskRunner;
44 }  // namespace base
45 
46 // This class handles the shared memory buffer on the producer side. It is used
47 // to obtain thread-local chunks and to partition pages from several threads.
48 // There is one arbiter instance per Producer.
49 // This class is thread-safe and uses locks to do so. Data sources are supposed
50 // to interact with this sporadically, only when they run out of space on their
51 // current thread-local chunk.
52 //
53 // When the arbiter is created using CreateUnboundInstance(), the following
54 // state transitions are possible:
55 //
56 //   [ !fully_bound_, !endpoint_, 0 unbound buffer reservations ]
57 //       |     |
58 //       |     | CreateStartupTraceWriter(buf)
59 //       |     |  buffer reservations += buf
60 //       |     |
61 //       |     |             ----
62 //       |     |            |    | CreateStartupTraceWriter(buf)
63 //       |     |            |    |  buffer reservations += buf
64 //       |     V            |    V
65 //       |   [ !fully_bound_, !endpoint_, >=1 unbound buffer reservations ]
66 //       |                                                |
67 //       |                       BindToProducerEndpoint() |
68 //       |                                                |
69 //       | BindToProducerEndpoint()                       |
70 //       |                                                V
71 //       |   [ !fully_bound_, endpoint_, >=1 unbound buffer reservations ]
72 //       |   A    |    A                               |     A
73 //       |   |    |    |                               |     |
74 //       |   |     ----                                |     |
75 //       |   |    CreateStartupTraceWriter(buf)        |     |
76 //       |   |     buffer reservations += buf          |     |
77 //       |   |                                         |     |
78 //       |   | CreateStartupTraceWriter(buf)           |     |
79 //       |   |  where buf is not yet bound             |     |
80 //       |   |  buffer reservations += buf             |     | (yes)
81 //       |   |                                         |     |
82 //       |   |        BindStartupTargetBuffer(buf, id) |-----
83 //       |   |           buffer reservations -= buf    | reservations > 0?
84 //       |   |                                         |
85 //       |   |                                         | (no)
86 //       V   |                                         V
87 //       [ fully_bound_, endpoint_, 0 unbound buffer reservations ]
88 //          |    A
89 //          |    | CreateStartupTraceWriter(buf)
90 //          |    |  where buf is already bound
91 //           ----
92 class SharedMemoryArbiterImpl : public SharedMemoryArbiter {
93  public:
94   // See SharedMemoryArbiter::CreateInstance(). |start|, |size| define the
95   // boundaries of the shared memory buffer. ProducerEndpoint and TaskRunner may
96   // be |nullptr| if created unbound, see
97   // SharedMemoryArbiter::CreateUnboundInstance().
98   SharedMemoryArbiterImpl(void* start,
99                           size_t size,
100                           size_t page_size,
101                           TracingService::ProducerEndpoint*,
102                           base::TaskRunner*);
103 
104   // Returns a new Chunk to write tracing data. Depending on the provided
105   // BufferExhaustedPolicy, this may return an invalid chunk if no valid free
106   // chunk could be found in the SMB.
107   SharedMemoryABI::Chunk GetNewChunk(const SharedMemoryABI::ChunkHeader&,
108                                      BufferExhaustedPolicy,
109                                      size_t size_hint = 0);
110 
111   // Puts back a Chunk that has been completed and sends a request to the
112   // service to move it to the central tracing buffer. |target_buffer| is the
113   // absolute trace buffer ID where the service should move the chunk onto (the
114   // producer is just to copy back the same number received in the
115   // DataSourceConfig upon the StartDataSource() reques).
116   // PatchList is a pointer to the list of patches for previous chunks. The
117   // first patched entries will be removed from the patched list and sent over
118   // to the service in the same CommitData() IPC request.
119   void ReturnCompletedChunk(SharedMemoryABI::Chunk,
120                             MaybeUnboundBufferID target_buffer,
121                             PatchList*);
122 
123   // Send a request to the service to apply completed patches from |patch_list|.
124   // |writer_id| is the ID of the TraceWriter that calls this method,
125   // |target_buffer| is the global trace buffer ID of its target buffer.
126   void SendPatches(WriterID writer_id,
127                    MaybeUnboundBufferID target_buffer,
128                    PatchList* patch_list);
129 
shmem_abi_for_testing()130   SharedMemoryABI* shmem_abi_for_testing() { return &shmem_abi_; }
131 
set_default_layout_for_testing(SharedMemoryABI::PageLayout l)132   static void set_default_layout_for_testing(SharedMemoryABI::PageLayout l) {
133     default_page_layout = l;
134   }
135 
136   // SharedMemoryArbiter implementation.
137   // See include/perfetto/tracing/core/shared_memory_arbiter.h for comments.
138   std::unique_ptr<TraceWriter> CreateTraceWriter(
139       BufferID target_buffer,
140       BufferExhaustedPolicy = BufferExhaustedPolicy::kDefault) override;
141   std::unique_ptr<TraceWriter> CreateStartupTraceWriter(
142       uint16_t target_buffer_reservation_id) override;
143   void BindToProducerEndpoint(TracingService::ProducerEndpoint*,
144                               base::TaskRunner*) override;
145   void BindStartupTargetBuffer(uint16_t target_buffer_reservation_id,
146                                BufferID target_buffer_id) override;
147   void AbortStartupTracingForReservation(
148       uint16_t target_buffer_reservation_id) override;
149   void NotifyFlushComplete(FlushRequestID) override;
150 
151   void SetBatchCommitsDuration(uint32_t batch_commits_duration_ms) override;
152 
153   bool EnableDirectSMBPatching() override;
154 
155   void SetDirectSMBPatchingSupportedByService() override;
156 
157   void FlushPendingCommitDataRequests(
158       std::function<void()> callback = {}) override;
159   bool TryShutdown() override;
160 
task_runner()161   base::TaskRunner* task_runner() const { return task_runner_; }
page_size()162   size_t page_size() const { return shmem_abi_.page_size(); }
num_pages()163   size_t num_pages() const { return shmem_abi_.num_pages(); }
164 
GetWeakPtr()165   base::WeakPtr<SharedMemoryArbiterImpl> GetWeakPtr() const {
166     return weak_ptr_factory_.GetWeakPtr();
167   }
168 
169  private:
170   friend class TraceWriterImpl;
171   friend class StartupTraceWriterTest;
172   friend class SharedMemoryArbiterImplTest;
173 
174   struct TargetBufferReservation {
175     bool resolved = false;
176     BufferID target_buffer = kInvalidBufferId;
177   };
178 
179   // Placeholder for the actual target buffer ID of a startup target buffer
180   // reservation ID in |target_buffer_reservations_|.
181   static constexpr BufferID kInvalidBufferId = 0;
182 
183   static SharedMemoryABI::PageLayout default_page_layout;
184 
185   SharedMemoryArbiterImpl(const SharedMemoryArbiterImpl&) = delete;
186   SharedMemoryArbiterImpl& operator=(const SharedMemoryArbiterImpl&) = delete;
187 
188   void UpdateCommitDataRequest(SharedMemoryABI::Chunk chunk,
189                                WriterID writer_id,
190                                MaybeUnboundBufferID target_buffer,
191                                PatchList* patch_list);
192 
193   // Search the chunks that are being batched in |commit_data_req_| for a chunk
194   // that needs patching and that matches the provided |writer_id| and
195   // |patch.chunk_id|. If found, apply |patch| to that chunk, and if
196   // |chunk_needs_more_patching| is true, clear the needs patching flag of the
197   // chunk and mark it as complete - to allow the service to read it (and other
198   // chunks after it) during scraping. Returns true if the patch was applied,
199   // false otherwise.
200   //
201   // Note: the caller must be holding |lock_| for the duration of the call.
202   bool TryDirectPatchLocked(WriterID writer_id,
203                             const Patch& patch,
204                             bool chunk_needs_more_patching);
205   std::unique_ptr<TraceWriter> CreateTraceWriterInternal(
206       MaybeUnboundBufferID target_buffer,
207       BufferExhaustedPolicy);
208 
209   // Called by the TraceWriter destructor.
210   void ReleaseWriterID(WriterID);
211 
212   void BindStartupTargetBufferImpl(std::unique_lock<std::mutex> scoped_lock,
213                                    uint16_t target_buffer_reservation_id,
214                                    BufferID target_buffer_id);
215 
216   // If any flush callbacks were queued up while the arbiter or any target
217   // buffer reservation was unbound, this wraps the pending callbacks into a new
218   // std::function and returns it. Otherwise returns an invalid std::function.
219   std::function<void()> TakePendingFlushCallbacksLocked();
220 
221   // Replace occurrences of target buffer reservation IDs in |commit_data_req_|
222   // with their respective actual BufferIDs if they were already bound. Returns
223   // true iff all occurrences were replaced.
224   bool ReplaceCommitPlaceholderBufferIdsLocked();
225 
226   // Update and return |fully_bound_| based on the arbiter's |pending_writers_|
227   // state.
228   bool UpdateFullyBoundLocked();
229 
230   const bool initially_bound_;
231 
232   // Only accessed on |task_runner_| after the producer endpoint was bound.
233   TracingService::ProducerEndpoint* producer_endpoint_ = nullptr;
234 
235   // --- Begin lock-protected members ---
236 
237   std::mutex lock_;
238 
239   base::TaskRunner* task_runner_ = nullptr;
240   SharedMemoryABI shmem_abi_;
241   size_t page_idx_ = 0;
242   std::unique_ptr<CommitDataRequest> commit_data_req_;
243   size_t bytes_pending_commit_ = 0;  // SUM(chunk.size() : commit_data_req_).
244   IdAllocator<WriterID> active_writer_ids_;
245   bool did_shutdown_ = false;
246 
247   // Whether the arbiter itself and all startup target buffer reservations are
248   // bound. Note that this can become false again later if a new target buffer
249   // reservation is created by calling CreateStartupTraceWriter() with a new
250   // reservation id.
251   bool fully_bound_;
252 
253   // IDs of writers and their assigned target buffers that should be registered
254   // with the service after the arbiter and/or their startup target buffer is
255   // bound.
256   std::map<WriterID, MaybeUnboundBufferID> pending_writers_;
257 
258   // Callbacks for flush requests issued while the arbiter or a target buffer
259   // reservation was unbound.
260   std::vector<std::function<void()>> pending_flush_callbacks_;
261 
262   // See SharedMemoryArbiter::SetBatchCommitsDuration.
263   uint32_t batch_commits_duration_ms_ = 0;
264 
265   // See SharedMemoryArbiter::EnableDirectSMBPatching.
266   bool direct_patching_enabled_ = false;
267 
268   // See SharedMemoryArbiter::SetDirectSMBPatchingSupportedByService.
269   bool direct_patching_supported_by_service_ = false;
270 
271   // Indicates whether we have already scheduled a delayed flush for the
272   // purposes of batching. Set to true at the beginning of a batching period and
273   // cleared at the end of the period. Immediate flushes that happen during a
274   // batching period will empty the |commit_data_req| (triggering an immediate
275   // IPC to the service), but will not clear this flag and the
276   // previously-scheduled delayed flush will still occur at the end of the
277   // batching period.
278   bool delayed_flush_scheduled_ = false;
279 
280   // Stores target buffer reservations for writers created via
281   // CreateStartupTraceWriter(). A bound reservation sets
282   // TargetBufferReservation::resolved to true and is associated with the actual
283   // BufferID supplied in BindStartupTargetBuffer().
284   //
285   // TODO(eseckler): Clean up entries from this map. This would probably require
286   // a method in SharedMemoryArbiter that allows a producer to invalidate a
287   // reservation ID.
288   std::map<MaybeUnboundBufferID, TargetBufferReservation>
289       target_buffer_reservations_;
290 
291   // --- End lock-protected members ---
292 
293   // Keep at the end.
294   base::WeakPtrFactory<SharedMemoryArbiterImpl> weak_ptr_factory_;
295 };
296 
297 }  // namespace perfetto
298 
299 #endif  // SRC_TRACING_CORE_SHARED_MEMORY_ARBITER_IMPL_H_
300