1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h"
17 
18 #include "tensorflow/core/platform/stacktrace.h"
19 #include "tensorflow/core/platform/stream_executor.h"
20 #include "tensorflow/core/protobuf/config.pb.h"
21 
22 namespace tensorflow {
23 
24 namespace {
25 // The EventMgr has 1 thread for the polling loop and one to execute
26 // event callback functions. Issues for reconsideration:
27 //  - Is this the right number of threads?
28 //  - Should EventMgrs be shared between GPUDevices on a multi-GPU machine?
29 static const int kNumThreads = 2;
30 }  // namespace
31 
32 namespace gpu_event_mgr {
33 class ThreadLabel {
34  public:
GetValue()35   static const char* GetValue() { return value_; }
36 
37   // v must be a static const because value_ will capture and use its value
38   // until reset or thread terminates.
SetValue(const char * v)39   static void SetValue(const char* v) { value_ = v; }
40 
41  private:
42   static thread_local const char* value_;
43 };
44 thread_local const char* ThreadLabel::value_ = "";
45 
WarnIfInCallback(std::function<void ()> f)46 void WarnIfInCallback(std::function<void()> f) {
47   const char* label = ThreadLabel::GetValue();
48   if (label && !strcmp(label, "gpu_event_mgr")) {
49     if (f) {
50       f();
51     } else {
52       LOG(WARNING) << "Executing inside EventMgr callback thread: "
53                    << CurrentStackTrace();
54     }
55   }
56 }
57 
InitThreadpoolLabels(thread::ThreadPool * threadpool)58 void InitThreadpoolLabels(thread::ThreadPool* threadpool) {
59   static const char* label = "gpu_event_mgr";
60   mutex mu;
61   int init_count = 0;
62   condition_variable all_initialized;
63   int exit_count = 0;
64   condition_variable ready_to_exit;
65   const int num_threads = threadpool->NumThreads();
66   for (int i = 0; i < num_threads; ++i) {
67     threadpool->Schedule([num_threads, &mu, &init_count, &all_initialized,
68                           &exit_count, &ready_to_exit]() {
69       gpu_event_mgr::ThreadLabel::SetValue(label);
70       mutex_lock l(mu);
71       ++init_count;
72       if (init_count == num_threads) {
73         all_initialized.notify_all();
74       }
75       while (init_count < num_threads) {
76         all_initialized.wait(l);
77       }
78       if (++exit_count == num_threads) {
79         ready_to_exit.notify_all();
80       }
81     });
82   }
83   {
84     mutex_lock l(mu);
85     while (exit_count < num_threads) {
86       ready_to_exit.wait(l);
87     }
88   }
89 }
90 }  // namespace gpu_event_mgr
91 
EventMgr(se::StreamExecutor * se,const GPUOptions & gpu_options)92 EventMgr::EventMgr(se::StreamExecutor* se, const GPUOptions& gpu_options)
93     : exec_(se),
94       deferred_bytes_threshold_(gpu_options.deferred_deletion_bytes()
95                                     ? gpu_options.deferred_deletion_bytes()
96                                     : 8 * 1048576),
97       polling_active_delay_usecs_(gpu_options.polling_active_delay_usecs()
98                                       ? gpu_options.polling_active_delay_usecs()
99                                       : 10),
100       accumulated_stream_(nullptr),
101       accumulated_tensors_(new TensorReferenceVector),
102       accumulated_tensor_bytes_(0),
103       threadpool_(Env::Default(), "GPU_Event_Manager", kNumThreads) {
104   gpu_event_mgr::InitThreadpoolLabels(&threadpool_);
105   StartPollingLoop();
106 }
107 
~EventMgr()108 EventMgr::~EventMgr() {
109   StopPollingLoop();
110 
111   // Events are owned by this object.
112   for (auto& e : free_events_) {
113     delete e;
114   }
115   for (auto& t : *(accumulated_tensors_)) {
116     t.Unref();
117   }
118   delete accumulated_tensors_;
119   while (!used_events_.empty()) {
120     InUse* ue = &used_events_[0];
121     delete ue->event;
122     if (ue->mem != nullptr) {
123       for (auto& t : *(ue->mem)) {
124         t.Unref();
125       }
126       delete ue->mem;
127     }
128     if (ue->bufrec.buf) {
129       if (LogMemory::IsEnabled()) {
130         LogMemory::RecordRawDeallocation(ue->bufrec.operation,
131                                          ue->bufrec.step_id, ue->bufrec.buf,
132                                          ue->bufrec.alloc, false);
133       }
134       ue->bufrec.alloc->DeallocateRaw(ue->bufrec.buf);
135     }
136     if (ue->func != nullptr) threadpool_.Schedule(ue->func);
137     used_events_.pop_front();
138   }
139 }
140 
StartPollingLoop()141 void EventMgr::StartPollingLoop() {
142   CHECK(polling_stopped_ == nullptr);
143   {
144     mutex_lock l(mu_);
145     stop_polling_ = false;
146   }
147   polling_stopped_.reset(new Notification);
148   threadpool_.Schedule([this]() { PollLoop(); });
149 }
150 
StopPollingLoop()151 void EventMgr::StopPollingLoop() {
152   if (polling_stopped_) {
153     {
154       mutex_lock l(mu_);
155       stop_polling_ = true;
156       events_pending_.notify_all();
157     }
158     polling_stopped_->WaitForNotification();
159     polling_stopped_.reset(nullptr);
160   }
161 }
162 
ThenDeleteTensors(se::Stream * stream,const TensorReferenceVector & tensors)163 void EventMgr::ThenDeleteTensors(se::Stream* stream,
164                                  const TensorReferenceVector& tensors) {
165   mutex_lock l(mu_);
166   // TODO(jeff): We currently keep one accumulated_tensors_ object.
167   // If we start to use multiple streams heavily, we might want to keep
168   // separate vectors/byte counters per stream
169   if (!accumulated_tensors_->empty() && stream != accumulated_stream_) {
170     FlushAccumulatedTensors();
171   }
172   accumulated_stream_ = stream;
173   for (const auto& t : tensors) {
174     // accumulated_tensors_ takes over ownership of the reference to "t"
175     accumulated_tensors_->push_back(t);
176     accumulated_tensor_bytes_ += t.TotalBytes();
177   }
178   if (accumulated_tensor_bytes_ >= deferred_bytes_threshold_) {
179     FlushAccumulatedTensors();
180   }
181 }
182 
FlushAccumulatedTensors()183 void EventMgr::FlushAccumulatedTensors() {
184   DCHECK(!accumulated_tensors_->empty());
185   DCHECK(accumulated_stream_ != nullptr);
186   QueueTensors(accumulated_stream_, accumulated_tensors_);
187   accumulated_tensors_ = new TensorReferenceVector;
188   accumulated_tensor_bytes_ = 0;
189   accumulated_stream_ = nullptr;
190 }
191 
192 // A polling loop to detect completion of GPU events.
193 //
194 // While one or more events is outstanding, poll for completed events.  When no
195 // events are outstanding, we sleep until one is enqueued.
PollLoop()196 void EventMgr::PollLoop() {
197   ToFreeVector to_free;
198   while (true) {
199     bool events_still_pending;
200     {
201       mutex_lock l(mu_);
202       if (stop_polling_) {
203         break;
204       }
205       if (used_events_.empty()) {
206         events_pending_.wait(l);
207       }
208       PollEvents(true, &to_free);
209       events_still_pending = !used_events_.empty();
210     }
211     FreeMemory(to_free);
212     to_free.clear();
213 
214     if (events_still_pending) {
215       Env::Default()->SleepForMicroseconds(polling_active_delay_usecs_);
216     }
217   }
218   polling_stopped_->Notify();
219 }
220 
QueueInUse(se::Stream * stream,InUse iu)221 void EventMgr::QueueInUse(se::Stream* stream, InUse iu) {
222   VLOG(2) << "QueueInUse  free_events_ " << free_events_.size()
223           << " used_events_ " << used_events_.size();
224   // Events are created on demand, and repeatedly reused.  There is no
225   // limit placed here on the number of allocated Events.
226   if (free_events_.empty()) {
227     free_events_.push_back(new se::Event(exec_));
228     free_events_.back()->Init();
229   }
230   se::Event* e = free_events_.back();
231   free_events_.pop_back();
232   stream->ThenRecordEvent(e);
233   iu.event = e;
234   bool was_empty = used_events_.empty();
235   used_events_.push_back(iu);
236   // Maybe wake up the polling thread
237   if (was_empty) events_pending_.notify_all();
238 }
239 
240 // This function must be called periodically to check whether pending
241 // events have recorded, and then retire them.  Initial observations
242 // suggest that typical behavior in a TensorFlow program is to have
243 // 0-3 events pending most of the time, but there are occasionally
244 // spikes of up to several hundred outstanding.  (If GPUKernelTracker
245 // is used to cap pending kernels there should never be more than
246 // that many.)
247 //
248 // NOTE: If all events are on the same stream, no later event will
249 // complete before an earlier event, except possibly if the earlier
250 // event transitions to an error state, so there's no advantage in
251 // looking past the first kPending event.  However, if we're using
252 // multiple streams there may be some gain in looking deeper.
253 // As a compromise, PollEvent() calls that are triggered by the queueing
254 // of a single event never look past the first kPending event.  Consequently
255 // those calls do an expected constant amount of work, unaffected by the
256 // length of the pending queue.  Calls coming from the dedicated
257 // polling thread always sweep the full queue.
PollEvents(bool is_dedicated_poller,gtl::InlinedVector<InUse,4> * to_free)258 void EventMgr::PollEvents(bool is_dedicated_poller,
259                           gtl::InlinedVector<InUse, 4>* to_free) {
260   VLOG(2) << "PollEvents  free_events_ " << free_events_.size()
261           << " used_events_ " << used_events_.size();
262   // Sweep the remaining events in order.  If this is the dedicated
263   // polling thread, check the entire set.  Otherwise, just sweep up to
264   // the first non-complete record that is still pending.
265   for (auto& iu : used_events_) {
266     if (iu.event == nullptr) continue;
267     se::Event::Status s = iu.event->PollForStatus();
268     switch (s) {
269       case se::Event::Status::kUnknown:
270       case se::Event::Status::kError:
271         // We don't expect to see these.  Someday maybe propagate
272         // a Status error, but for now fail hard.
273         LOG(FATAL) << "Unexpected Event status: " << static_cast<int>(s);
274         break;
275       case se::Event::Status::kPending:
276         if (!is_dedicated_poller) return;  // quit processing queue
277         break;
278       case se::Event::Status::kComplete:
279         // Make a copy of the InUse record so we can free it after releasing
280         // the lock
281         to_free->push_back(iu);
282         free_events_.push_back(iu.event);
283         // Mark this InUse record as completed.
284         iu.event = nullptr;
285     }
286   }
287   // Then clear any completed InUse records from the front of the queue.
288   while (!used_events_.empty()) {
289     InUse& iu = used_events_.front();
290     if (iu.event == nullptr) {
291       used_events_.pop_front();
292     } else {
293       break;
294     }
295   }
296 }
297 
298 }  // namespace tensorflow
299