1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_DEVICE_DEVICE_EVENT_MGR_H_
17 #define TENSORFLOW_CORE_COMMON_RUNTIME_DEVICE_DEVICE_EVENT_MGR_H_
18 
19 #include <deque>
20 #include <vector>
21 
22 #include "tensorflow/core/framework/log_memory.h"
23 #include "tensorflow/core/framework/tensor.h"
24 #include "tensorflow/core/lib/core/notification.h"
25 #include "tensorflow/core/lib/core/threadpool.h"
26 #include "tensorflow/core/lib/gtl/inlined_vector.h"
27 #include "tensorflow/core/platform/mutex.h"
28 #include "tensorflow/core/platform/stream_executor.h"
29 #include "tensorflow/core/platform/thread_annotations.h"
30 #include "tensorflow/core/platform/types.h"
31 
32 namespace stream_executor {
33 class Event;
34 class Stream;
35 class StreamExecutor;
36 }  // namespace stream_executor
37 
38 namespace tensorflow {
39 
40 // TODO(annarev): Check if we can use a more general option representation here
41 // that could work for other device types as well.
42 class GPUOptions;
43 
44 // The callback provided to EventMgr::ThenExecute must not block or take a long
45 // time.  If it does, performance may be impacted and device memory may be
46 // exhausted.  This macro is for checking that an EventMgr thread is not
47 // accidentally entering blocking parts of the code, e.g. the RPC subsystem.
48 //
49 // Intended use is something like
50 //
51 //   void RespondToAnRPC(Params* params) {
52 //      WARN_IF_IN_EVENT_MGR_THREAD;
53 //      if (params->status.ok()) { ...
54 //
55 namespace device_event_mgr {
56 // Logs a stack trace if current execution thread belongs to this EventMgr
57 // object.  If f is not nullptr, executes instead of  logging the stack trace.
58 // trace.
59 void WarnIfInCallback(std::function<void()> f);
60 }  // namespace device_event_mgr
61 #define WARN_IF_IN_EVENT_MGR_THREAD device_event_mgr::WarnIfInCallback(nullptr)
62 
63 // An object to keep track of pending Events in the StreamExecutor streams
64 // and associated Tensors that cannot safely be deleted until the associated
65 // Events are recorded.
66 class EventMgr {
67  public:
68   virtual ~EventMgr();
69 
70   // Execute func when all pending stream actions have completed.
71   // func must be brief and non-blocking since it executes in the one
72   // thread used for all such callbacks and also buffer deletions.
ThenExecute(se::Stream * stream,std::function<void ()> func)73   inline void ThenExecute(se::Stream* stream, std::function<void()> func) {
74     ToFreeVector to_free;
75     {
76       mutex_lock l(mu_);
77       QueueFunc(stream, std::move(func));
78       PollEvents(false, &to_free);
79     }
80     FreeMemory(to_free);
81   }
82 
83  private:
84   friend class TEST_EventMgr;
85   friend class TEST_EventMgrHelper;
86   friend class EventMgrFactory;
87   se::StreamExecutor* const exec_;
88   const int32 polling_active_delay_usecs_;
89   mutex mu_;
90   condition_variable events_pending_ TF_GUARDED_BY(mu_);
91 
92   struct InUse {
93     se::Event* event;
94     std::function<void()> func;
95   };
96 
97   typedef gtl::InlinedVector<InUse, 4> ToFreeVector;
98 
99   EventMgr(se::StreamExecutor* se, const GPUOptions& gpu_options);
100 
FreeMemory(const ToFreeVector & to_free)101   void FreeMemory(const ToFreeVector& to_free) {
102     for (const auto& iu : to_free) {
103       // The function must be called in another thread.
104       if (iu.func != nullptr) threadpool_.Schedule(iu.func);
105     }
106   }
107 
108   // Stream-enqueue an unused Event and save with it a collection of
109   // Tensors and/or a BufRec to be deleted only after the Event
110   // records.
111   void QueueInUse(se::Stream* stream, InUse in_use)
112       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
113 
QueueFunc(se::Stream * stream,std::function<void ()> func)114   void QueueFunc(se::Stream* stream, std::function<void()> func)
115       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
116     QueueInUse(stream, {nullptr, std::move(func)});
117   }
118 
119   // This function should be called at roughly the same tempo as
120   // QueueTensors() to check whether pending events have recorded,
121   // and then retire them.  It appends InUse elements that need cleanup
122   // to "*to_free".  The caller should call FreeMemory(to_free)
123   // when this returns.
124   void PollEvents(bool is_dedicated_poller, ToFreeVector* to_free)
125       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
126 
127   // An internal polling loop that runs at a low frequency to clear
128   // straggler Events.
129   void PollLoop();
130 
131   // Setup/Teardown functions for the polling loop.
132   void StartPollingLoop();
133   void StopPollingLoop();
134 
135   // A stack of unused events
136   std::vector<se::Event*> free_events_ TF_GUARDED_BY(mu_);
137 
138   // A FIFO queue of InUse events and associated tensors.
139   std::deque<InUse> used_events_ TF_GUARDED_BY(mu_);
140 
141   bool stop_polling_ TF_GUARDED_BY(mu_);
142   std::unique_ptr<Notification> polling_stopped_;
143 
144   // The main PollLoop for the event manager runs in this threadpool.
145   thread::ThreadPool threadpool_;
146 };
147 
148 // Manages all the EventMgr instances.
149 class EventMgrFactory {
150  public:
151   static EventMgrFactory* Singleton();
152 
153   EventMgr* GetEventMgr(se::StreamExecutor* se, const GPUOptions& gpu_options);
154 
155  private:
156   mutex mu_;
157 
158   // Maintain one EventMgr per physical device (StreamExecutor is
159   // per-physical-device).
160   std::map<se::StreamExecutor*, EventMgr*> event_mgr_map_ TF_GUARDED_BY(mu_);
161 };
162 
163 }  // namespace tensorflow
164 #endif  // TENSORFLOW_CORE_COMMON_RUNTIME_DEVICE_DEVICE_EVENT_MGR_H_
165