1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/profiler/internal/traceme_recorder.h"
16
17 // To avoid unneccesary synchronization between threads, each thread has a
18 // ThreadLocalRecorder that independently records its events.
19 //
20 // Events are stored in an EventQueue implemented as a linked-list of blocks,
21 // with start and end pointers:
22 // [ events........ | next-]--> [ events......... | next ]
23 // ^start_block ^start ^end_block ^end
24 //
25 // Record() writes at end, and then advances it, allocating a block if needed.
26 // Clear() takes ownership of events in the range [start, end).
27 // The end pointer is atomic so these can be concurrent.
28 //
29 // If a thread dies, the ThreadLocalRecorder's destructor hands its data off to
30 // the orphaned_events list.
31
32 #include <string>
33 #include "absl/container/flat_hash_map.h"
34 #include "tensorflow/core/platform/env.h"
35 #include "tensorflow/core/platform/mutex.h"
36 #include "tensorflow/stream_executor/lib/initialize.h"
37
38 namespace tensorflow {
39 namespace profiler {
40
41 // Default value for g_trace_level when tracing is disabled
42 constexpr static int kTracingDisabled = -1;
43
44 namespace internal {
45 std::atomic<int> g_trace_level = ATOMIC_VAR_INIT(kTracingDisabled);
46 } // namespace internal
47
48 namespace {
49
50 class ThreadLocalRecorder;
51
52 struct Data {
53 // Lock for only rare events - start/stop, thread death.
54 mutex global_lock;
55 // Map of the static container instances (thread_local storage) for each
56 // thread, that store the trace events.
57 absl::flat_hash_map<uint64, ThreadLocalRecorder*> threads
58 GUARDED_BY(global_lock);
59 // Events traced from threads that died during tracing.
60 TraceMeRecorder::Events orphaned_events GUARDED_BY(global_lock);
61 }* g_data = nullptr;
62
63 // A single-producer single-consumer queue of Events.
64 // Only the owner thread can write events, writing is lock-free.
65 // Consume is also lock-free in this class.
66 //
67 // Internally, we have a linked list of blocks containing numbered slots.
68 // start is the first occupied slot, end is the first unoccupied slot.
69 class EventQueue {
70 public:
EventQueue()71 EventQueue()
72 : start_block_(new Block{0, nullptr}), end_block_(start_block_) {}
73
74 // REQUIRES: Consume() was called since the last Push().
75 // Memory should be deallocated and trace events destroyed on destruction.
76 // This doesn't require global lock as this discards all the stored trace
77 // events and we assume of destruction of this class only after the last
78 // Push() has been called.
~EventQueue()79 ~EventQueue() {
80 DCHECK_EQ(start_, end_.load()) << "EventQueue destroyed without Consume()";
81 delete end_block_;
82 }
83
84 // Add a new event to the back of the queue. Fast and lock-free.
Push(TraceMeRecorder::Event && event)85 void Push(TraceMeRecorder::Event&& event) {
86 uint64 end = end_.load(std::memory_order_relaxed);
87 new (&end_block_->events[end++ - end_block_->start].event)
88 TraceMeRecorder::Event(std::move(event));
89 if (ABSL_PREDICT_FALSE(end - end_block_->start == Block::kLength)) {
90 auto* new_block = new Block{end, nullptr};
91 end_block_->next = new_block;
92 end_block_ = new_block;
93 }
94 end_.store(end, std::memory_order_release); // Write index after contents.
95 }
96
97 // Retrieve and remove all events in the queue.
Consume()98 std::vector<TraceMeRecorder::Event> Consume() {
99 // Read index before contents.
100 uint64 end = end_.load(std::memory_order_acquire);
101 std::vector<TraceMeRecorder::Event> result;
102 result.reserve(end - start_);
103 while (start_ != end) {
104 Shift(&result);
105 }
106 return result;
107 }
108
109 private:
110 // Shift one event off the front of the queue into *out.
Shift(std::vector<TraceMeRecorder::Event> * out)111 void Shift(std::vector<TraceMeRecorder::Event>* out) {
112 // Move the next event into the output.
113 auto& event = start_block_->events[start_++ - start_block_->start].event;
114 out->push_back(std::move(event));
115 event.~Event(); // Events must be individually destroyed.
116 // If we reach the end of a block, we own it and should delete it.
117 // The next block is present: end always points to something.
118 if (start_ - start_block_->start == Block::kLength) {
119 auto* next_block = start_block_->next;
120 delete start_block_;
121 start_block_ = next_block;
122 }
123 }
124
125 // The number of slots in a block. Chosen so that the block fits in 64k.
126 struct Block {
127 static constexpr size_t kLength =
128 ((1 << 16) - (sizeof(uint64) + sizeof(std::atomic<Block*>))) /
129 sizeof(TraceMeRecorder::Event);
130
131 const uint64 start; // The number of the first slot.
132 Block* next;
133 // Defer construction of Event until the data is available.
134 // Must also destroy manually, as the block may not fill entirely.
135 union MaybeEvent {
MaybeEvent()136 MaybeEvent() {}
~MaybeEvent()137 ~MaybeEvent() {}
138 TraceMeRecorder::Event event;
139 } events[kLength];
140 };
141
142 // Head of list for reading. Only accessed by consumer thread.
143 Block* start_block_;
144 uint64 start_ = 0;
145 // Tail of list for writing. Accessed by producer thread.
146 Block* end_block_;
147 std::atomic<uint64> end_ = {0}; // Atomic: also read by consumer thread.
148 };
149
150 class ThreadLocalRecorder {
151 public:
152 // The recorder is created the first time Record() is called on a thread.
ThreadLocalRecorder()153 ThreadLocalRecorder() {
154 auto* env = Env::Default();
155 info_.tid = env->GetCurrentThreadId();
156 env->GetCurrentThreadName(&info_.name);
157 mutex_lock lock(g_data->global_lock);
158 g_data->threads.emplace(info_.tid, this);
159 }
160
161 // The destructor is called when the thread shuts down early.
162 // We unregister this thread, and move its events to orphaned_events.
~ThreadLocalRecorder()163 ~ThreadLocalRecorder() {
164 mutex_lock lock(g_data->global_lock);
165 g_data->threads.erase(info_.tid);
166 g_data->orphaned_events.push_back(Clear());
167 }
168
169 // This is the performance-critical part!
Record(TraceMeRecorder::Event && event)170 void Record(TraceMeRecorder::Event&& event) { queue_.Push(std::move(event)); }
171
Clear()172 TraceMeRecorder::ThreadEvents Clear()
173 EXCLUSIVE_LOCKS_REQUIRED(g_data->global_lock) {
174 return {info_, queue_.Consume()};
175 }
176
177 private:
178 TraceMeRecorder::ThreadInfo info_;
179 EventQueue queue_;
180 };
181
182 // Gather events from all active threads, and clear their buffers. The global
183 // lock is held, so no threads can be added/removed for the duration while we
184 // consume the collected trace entries. This will block any new thread and also
185 // the starting and stopping of TraceMeRecorder, hence, this is performance
186 // critical and should be kept fast.
Clear()187 TraceMeRecorder::Events Clear() EXCLUSIVE_LOCKS_REQUIRED(g_data->global_lock) {
188 TraceMeRecorder::Events result;
189 std::swap(g_data->orphaned_events, result);
190 for (const auto& entry : g_data->threads) {
191 auto* recorder = entry.second;
192 result.push_back(recorder->Clear());
193 }
194 return result;
195 }
196
197 } // namespace
198
Start(int level)199 bool TraceMeRecorder::Start(int level) {
200 level = std::max(0, level);
201 mutex_lock lock(g_data->global_lock);
202 int expected = kTracingDisabled;
203 if (!internal::g_trace_level.compare_exchange_strong(
204 expected, level, std::memory_order_acq_rel)) {
205 return false;
206 }
207 // We may have old events in buffers because Record() raced with Stop().
208 Clear();
209 return true;
210 }
211
212
Record(Event event)213 void TraceMeRecorder::Record(Event event) {
214 static thread_local ThreadLocalRecorder thread_local_recorder;
215 thread_local_recorder.Record(std::move(event));
216 }
217
218 // Only one thread is expected to call Stop() as first instance of XprofSession
219 // prevents another XprofSession from doing any profiling.
Stop()220 TraceMeRecorder::Events TraceMeRecorder::Stop() {
221 mutex_lock lock(g_data->global_lock);
222 if (internal::g_trace_level.exchange(
223 kTracingDisabled, std::memory_order_acq_rel) == kTracingDisabled) {
224 return {};
225 }
226 return Clear();
227 }
228
Collect()229 TraceMeRecorder::Events TraceMeRecorder::Collect() {
230 mutex_lock lock(g_data->global_lock);
231 if (internal::g_trace_level.load(std::memory_order_acquire) ==
232 kTracingDisabled) {
233 return {};
234 }
235 return Clear();
236 }
237
238 } // namespace profiler
239 } // namespace tensorflow
240
241 REGISTER_MODULE_INITIALIZER(traceme_recorder, {
242 tensorflow::profiler::g_data = new tensorflow::profiler::Data();
243
244 // Workaround for b/35097229, the first block-scoped thread_local can
245 // trigger false positives in the heap checker. Currently triggered by
246 // //perftools/accelerators/xprof/xprofilez/integration_tests:xla_hlo_trace_test
247 static thread_local tensorflow::string fix_deadlock ABSL_ATTRIBUTE_UNUSED;
248 });
249