1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/profiler/internal/traceme_recorder.h"
16 
17 // To avoid unneccesary synchronization between threads, each thread has a
18 // ThreadLocalRecorder that independently records its events.
19 //
20 // Events are stored in an EventQueue implemented as a linked-list of blocks,
21 // with start and end pointers:
22 //  [ events........ | next-]--> [ events......... | next ]
23 //  ^start_block  ^start         ^end_block  ^end
24 //
25 // Record() writes at end, and then advances it, allocating a block if needed.
26 // Clear() takes ownership of events in the range [start, end).
27 // The end pointer is atomic so these can be concurrent.
28 //
29 // If a thread dies, the ThreadLocalRecorder's destructor hands its data off to
30 // the orphaned_events list.
31 
32 #include <string>
33 #include "absl/container/flat_hash_map.h"
34 #include "tensorflow/core/platform/env.h"
35 #include "tensorflow/core/platform/mutex.h"
36 #include "tensorflow/stream_executor/lib/initialize.h"
37 
38 namespace tensorflow {
39 namespace profiler {
40 
41 // Default value for g_trace_level when tracing is disabled
42 constexpr static int kTracingDisabled = -1;
43 
44 namespace internal {
45 std::atomic<int> g_trace_level = ATOMIC_VAR_INIT(kTracingDisabled);
46 }  // namespace internal
47 
48 namespace {
49 
50 class ThreadLocalRecorder;
51 
52 struct Data {
53   // Lock for only rare events - start/stop, thread death.
54   mutex global_lock;
55   // Map of the static container instances (thread_local storage) for each
56   // thread, that store the trace events.
57   absl::flat_hash_map<uint64, ThreadLocalRecorder*> threads
58       GUARDED_BY(global_lock);
59   // Events traced from threads that died during tracing.
60   TraceMeRecorder::Events orphaned_events GUARDED_BY(global_lock);
61 }* g_data = nullptr;
62 
63 // A single-producer single-consumer queue of Events.
64 // Only the owner thread can write events, writing is lock-free.
65 // Consume is also lock-free in this class.
66 //
67 // Internally, we have a linked list of blocks containing numbered slots.
68 // start is the first occupied slot, end is the first unoccupied slot.
69 class EventQueue {
70  public:
EventQueue()71   EventQueue()
72       : start_block_(new Block{0, nullptr}), end_block_(start_block_) {}
73 
74   // REQUIRES: Consume() was called since the last Push().
75   // Memory should be deallocated and trace events destroyed on destruction.
76   // This doesn't require global lock as this discards all the stored trace
77   // events and we assume of destruction of this class only after the last
78   // Push() has been called.
~EventQueue()79   ~EventQueue() {
80     DCHECK_EQ(start_, end_.load()) << "EventQueue destroyed without Consume()";
81     delete end_block_;
82   }
83 
84   // Add a new event to the back of the queue. Fast and lock-free.
Push(TraceMeRecorder::Event && event)85   void Push(TraceMeRecorder::Event&& event) {
86     uint64 end = end_.load(std::memory_order_relaxed);
87     new (&end_block_->events[end++ - end_block_->start].event)
88         TraceMeRecorder::Event(std::move(event));
89     if (ABSL_PREDICT_FALSE(end - end_block_->start == Block::kLength)) {
90       auto* new_block = new Block{end, nullptr};
91       end_block_->next = new_block;
92       end_block_ = new_block;
93     }
94     end_.store(end, std::memory_order_release);  // Write index after contents.
95   }
96 
97   // Retrieve and remove all events in the queue.
Consume()98   std::vector<TraceMeRecorder::Event> Consume() {
99     // Read index before contents.
100     uint64 end = end_.load(std::memory_order_acquire);
101     std::vector<TraceMeRecorder::Event> result;
102     result.reserve(end - start_);
103     while (start_ != end) {
104       Shift(&result);
105     }
106     return result;
107   }
108 
109  private:
110   // Shift one event off the front of the queue into *out.
Shift(std::vector<TraceMeRecorder::Event> * out)111   void Shift(std::vector<TraceMeRecorder::Event>* out) {
112     // Move the next event into the output.
113     auto& event = start_block_->events[start_++ - start_block_->start].event;
114     out->push_back(std::move(event));
115     event.~Event();  // Events must be individually destroyed.
116     // If we reach the end of a block, we own it and should delete it.
117     // The next block is present: end always points to something.
118     if (start_ - start_block_->start == Block::kLength) {
119       auto* next_block = start_block_->next;
120       delete start_block_;
121       start_block_ = next_block;
122     }
123   }
124 
125   // The number of slots in a block. Chosen so that the block fits in 64k.
126   struct Block {
127     static constexpr size_t kLength =
128         ((1 << 16) - (sizeof(uint64) + sizeof(std::atomic<Block*>))) /
129         sizeof(TraceMeRecorder::Event);
130 
131     const uint64 start;  // The number of the first slot.
132     Block* next;
133     // Defer construction of Event until the data is available.
134     // Must also destroy manually, as the block may not fill entirely.
135     union MaybeEvent {
MaybeEvent()136       MaybeEvent() {}
~MaybeEvent()137       ~MaybeEvent() {}
138       TraceMeRecorder::Event event;
139     } events[kLength];
140   };
141 
142   // Head of list for reading. Only accessed by consumer thread.
143   Block* start_block_;
144   uint64 start_ = 0;
145   // Tail of list for writing. Accessed by producer thread.
146   Block* end_block_;
147   std::atomic<uint64> end_ = {0};  // Atomic: also read by consumer thread.
148 };
149 
150 class ThreadLocalRecorder {
151  public:
152   // The recorder is created the first time Record() is called on a thread.
ThreadLocalRecorder()153   ThreadLocalRecorder() {
154     auto* env = Env::Default();
155     info_.tid = env->GetCurrentThreadId();
156     env->GetCurrentThreadName(&info_.name);
157     mutex_lock lock(g_data->global_lock);
158     g_data->threads.emplace(info_.tid, this);
159   }
160 
161   // The destructor is called when the thread shuts down early.
162   // We unregister this thread, and move its events to orphaned_events.
~ThreadLocalRecorder()163   ~ThreadLocalRecorder() {
164     mutex_lock lock(g_data->global_lock);
165     g_data->threads.erase(info_.tid);
166     g_data->orphaned_events.push_back(Clear());
167   }
168 
169   // This is the performance-critical part!
Record(TraceMeRecorder::Event && event)170   void Record(TraceMeRecorder::Event&& event) { queue_.Push(std::move(event)); }
171 
Clear()172   TraceMeRecorder::ThreadEvents Clear()
173       EXCLUSIVE_LOCKS_REQUIRED(g_data->global_lock) {
174     return {info_, queue_.Consume()};
175   }
176 
177  private:
178   TraceMeRecorder::ThreadInfo info_;
179   EventQueue queue_;
180 };
181 
182 // Gather events from all active threads, and clear their buffers. The global
183 // lock is held, so no threads can be added/removed for the duration while we
184 // consume the collected trace entries. This will block any new thread and also
185 // the starting and stopping of TraceMeRecorder, hence, this is performance
186 // critical and should be kept fast.
Clear()187 TraceMeRecorder::Events Clear() EXCLUSIVE_LOCKS_REQUIRED(g_data->global_lock) {
188   TraceMeRecorder::Events result;
189   std::swap(g_data->orphaned_events, result);
190   for (const auto& entry : g_data->threads) {
191     auto* recorder = entry.second;
192     result.push_back(recorder->Clear());
193   }
194   return result;
195 }
196 
197 }  // namespace
198 
Start(int level)199 bool TraceMeRecorder::Start(int level) {
200   level = std::max(0, level);
201   mutex_lock lock(g_data->global_lock);
202   int expected = kTracingDisabled;
203   if (!internal::g_trace_level.compare_exchange_strong(
204           expected, level, std::memory_order_acq_rel)) {
205     return false;
206   }
207   // We may have old events in buffers because Record() raced with Stop().
208   Clear();
209   return true;
210 }
211 
212 
Record(Event event)213 void TraceMeRecorder::Record(Event event) {
214   static thread_local ThreadLocalRecorder thread_local_recorder;
215   thread_local_recorder.Record(std::move(event));
216 }
217 
218 // Only one thread is expected to call Stop() as first instance of XprofSession
219 // prevents another XprofSession from doing any profiling.
Stop()220 TraceMeRecorder::Events TraceMeRecorder::Stop() {
221   mutex_lock lock(g_data->global_lock);
222   if (internal::g_trace_level.exchange(
223           kTracingDisabled, std::memory_order_acq_rel) == kTracingDisabled) {
224     return {};
225   }
226   return Clear();
227 }
228 
Collect()229 TraceMeRecorder::Events TraceMeRecorder::Collect() {
230   mutex_lock lock(g_data->global_lock);
231   if (internal::g_trace_level.load(std::memory_order_acquire) ==
232       kTracingDisabled) {
233     return {};
234   }
235   return Clear();
236 }
237 
238 }  // namespace profiler
239 }  // namespace tensorflow
240 
241 REGISTER_MODULE_INITIALIZER(traceme_recorder, {
242   tensorflow::profiler::g_data = new tensorflow::profiler::Data();
243 
244   // Workaround for b/35097229, the first block-scoped thread_local can
245   // trigger false positives in the heap checker. Currently triggered by
246   // //perftools/accelerators/xprof/xprofilez/integration_tests:xla_hlo_trace_test
247   static thread_local tensorflow::string fix_deadlock ABSL_ATTRIBUTE_UNUSED;
248 });
249