1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #if !GOOGLE_CUDA && !TENSORFLOW_USE_ROCM
17 #error This file must only be included when building with Cuda or ROCm support
18 #endif
19 
20 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
21 #define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
22 
23 #include <memory>
24 #include <string>
25 #include <unordered_map>
26 #include <vector>
27 
28 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
29 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
30 #include "tensorflow/core/common_runtime/device_factory.h"
31 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h"
32 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
33 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
34 #include "tensorflow/core/common_runtime/gpu_device_context.h"
35 #include "tensorflow/core/common_runtime/local_device.h"
36 #include "tensorflow/core/common_runtime/scoped_allocator_mgr.h"
37 #include "tensorflow/core/common_runtime/shared_counter.h"
38 #include "tensorflow/core/framework/allocator.h"
39 #include "tensorflow/core/framework/device_base.h"
40 #include "tensorflow/core/framework/op_kernel.h"
41 #include "tensorflow/core/framework/tensor.h"
42 #include "tensorflow/core/lib/core/status.h"
43 #include "tensorflow/core/lib/gtl/inlined_vector.h"
44 #include "tensorflow/core/platform/mutex.h"
45 #include "tensorflow/core/platform/stream_executor.h"
46 #include "tensorflow/core/platform/types.h"
47 #include "tensorflow/core/public/session_options.h"
48 
49 namespace tensorflow {
50 class GPUKernelTracker;
51 
52 class BaseGPUDevice : public LocalDevice {
53  public:
54   BaseGPUDevice(const SessionOptions& options, const std::string& name,
55                 Bytes memory_limit, const DeviceLocality& locality,
56                 TfGpuId tf_gpu_id, const std::string& physical_device_desc,
57                 Allocator* gpu_allocator, Allocator* cpu_allocator,
58                 bool sync_every_op);
59 
60   ~BaseGPUDevice() override;
61 
62   // Initialize the device and return the status of initialization.
63   Status Init(const SessionOptions& options);
64 
65   void Compute(OpKernel* op_kernel, OpKernelContext* context) override;
66 
67   Status Sync() override;
68 
69   void ComputeAsync(AsyncOpKernel* op_kernel, OpKernelContext* context,
70                     AsyncOpKernel::DoneCallback done) override;
71 
72   Status MakeTensorFromProto(const TensorProto& tensor_proto,
73                              const AllocatorAttributes alloc_attrs,
74                              Tensor* tensor) override;
75 
76   void CopyTensorInSameDevice(const Tensor* input_tensor, Tensor* output_tensor,
77                               const DeviceContext* device_context,
78                               StatusCallback done) override;
79 
80   // The caller owns the returned device.
81   PerOpGpuDevice* MakeGpuDevice() override;
82 
83   Status ReinitializeGpuDevice(OpKernelContext* context, PerOpGpuDevice* device,
84                                DeviceContext* dc,
85                                Allocator* allocator) override;
86 
87   // Returns the platform GPU id of this device within the native driver system;
88   // e.g., for CUDA and ROCm this is the ordinal of the GPU within the system.
gpu_id()89   int gpu_id() const {
90     PlatformGpuId platform_gpu_id;
91     TF_CHECK_OK(GpuIdManager::TfToPlatformGpuId(tf_gpu_id_, &platform_gpu_id));
92     return platform_gpu_id.value();
93   }
94 
95   // The executor that provides control for the device; e.g., for CUDA this
96   // corresponds to the cuda context.
executor()97   se::StreamExecutor* executor() const { return executor_; }
98 
99   Allocator* GetScopedAllocator(AllocatorAttributes attr,
100                                 int64 step_id) override;
101 
GetScopedAllocatorMgr()102   ScopedAllocatorMgr* GetScopedAllocatorMgr() const override {
103     return scoped_allocator_mgr_.get();
104   }
105 
106   // The following two functions always return 0 unless one of the
107   // related experimental config options has been specified.
108 
109   // If returned value is > 0 then GPU Memory chunks freed before this count
110   // are guaranteed not to be in use by any kernel pending on this device.
111   uint64 SafeAllocFrontier(uint64 old_value) override;
112 
113   // Returns the number of kernels that have been queued for execution on
114   // the compute stream and are not yet known to have completed.
115   int PendingKernels();
116 
priority()117   int priority() const { return stream_->priority; }
118 
119   // Helper method for unit tests to reset the streams. Never use in production.
120   static void TestOnlyReset();
121 
122  protected:
123   Allocator* gpu_allocator_;  // not owned
124   Allocator* cpu_allocator_;  // not owned
125 
126   se::StreamExecutor* executor_;  // not owned
127   std::unique_ptr<ScopedAllocatorMgr> scoped_allocator_mgr_;
128 
129  private:
130   friend class GPUDeviceTestHelper;
131   struct StreamGroup {
132     se::Stream* compute = nullptr;
133 #if TENSORFLOW_USE_ROCM
134     se::Stream* nccl = nullptr;
135 #endif
136     se::Stream* host_to_device = nullptr;
137     se::Stream* device_to_host = nullptr;
138     gtl::InlinedVector<se::Stream*, 4> device_to_device;
139     int priority = 0;
140   };
141   class StreamGroupFactory;
142 
143   StreamGroup* stream_;
144   mutex scratch_init_mutex_;
145   char* scratch_ = nullptr;
146   GPUDeviceContext* device_context_;
147   GpuDeviceInfo* gpu_device_info_ = nullptr;
148   mutex trace_mu_;
149   TfGpuId tf_gpu_id_;
150   const bool sync_every_op_ = false;
151   EventMgr* em_ = nullptr;
152   std::unique_ptr<thread::ThreadPool> thread_pool_;
153   std::unique_ptr<GPUKernelTracker> kernel_tracker_;
154   int32 pending_cap_ = 0;
155   bool timestamped_allocator_ = false;
156 
157   // Initialize scratch buffers used by Eigen.
158   Status InitScratchBuffers();
159 
160   void ReinitializeDevice(OpKernelContext* context, PerOpGpuDevice* device,
161                           int stream_id, Allocator* allocator);
162 
163   std::string ComputeOpKernelDebugString(const OpKernel& op_kernel,
164                                          const int& stream_id);
165 
166   // This method returns an initialization status, in addition to
167   // calling the "done" StatusCallback, if there is a failure to
168   // allocate memory or if the tensor "from" is not DMA-copyable.
169   // If there is no error prior to enqueueing the copy, an OK status
170   // is returned.
171   Status MaybeCopyTensorToGPU(const AllocatorAttributes& alloc_attrs,
172                               const Tensor& from, Tensor* to,
173                               StatusCallback done);
174 };
175 
176 // A per-compute-stream utility that keeps track of kernels that have been
177 // queued for execution but may not yet have terminated and also the queued
178 // time of the most recently terminated kernel.
179 class GPUKernelTracker {
180  public:
181   // Controls the strategy for inserting tracking events after GPU kernels.
182   //   If max_interval >= 0, then insert an event after this many kernels
183   //     if an event has not been inserted for another reason.
184   //   If max_bytes > 0, then insert an event after kernels allocating this
185   //     many bytes have been queued since the last event.
186   //   If max_pending > 0, then track up to this many events at once.  If
187   //     this limit is reached the GPU::Compute() method will delay starting
188   //     additional ops until some event completes.  If 0 and one of the other
189   //     fields is non-zero, then a reasonable default will be selected.
190   struct Params {
191     int max_interval = 0;
192     int max_bytes = 0;
193     int max_pending = 0;
ParamsParams194     Params(int mi, int mb, int mp)
195         : max_interval(mi), max_bytes(mb), max_pending(mp) {}
196   };
197 
198   // If we're going to share a SharedCounter with an allocator, it's owned
199   // by the allocator because allocators are initialized once per process.
200   // Devices are per-session.
GPUKernelTracker(const Params & params,Env * env,se::Stream * compute_stream,SharedCounter * timing_counter,Allocator * allocator,EventMgr * event_manager)201   explicit GPUKernelTracker(const Params& params, Env* env,
202                             se::Stream* compute_stream,
203                             SharedCounter* timing_counter, Allocator* allocator,
204                             EventMgr* event_manager)
205       : params_(params),
206         env_(env),
207         stream_(compute_stream),
208         timing_counter_(timing_counter),
209         allocator_(allocator),
210         em_(event_manager),
211         pending_kernels_(
212             params.max_pending > 0 ? std::max(8, 2 * params.max_pending) : 64) {
213     mem_since_last_ = 0;
214     if (!timing_counter_) {
215       // There's not a preexisting counter owned by GPUProcessState, i.e.
216       // pending_cap > 0 but timestamped_allocator == false.
217       owned_counter_.reset(new SharedCounter);
218       timing_counter_ = owned_counter_.get();
219     }
220   }
221 
222   // Determine whether a GPU kernel should have a recording event queued
223   // immediately afterwards.  If so, advance the counter and return the new
224   // counter value after enqueuing.
225   uint64 MaybeQueue(OpKernelContext* ctx);
226 
227   // Record that a GPU kernel has just been enqueued on the compute stream.
228   // Inserts the supplied counter value in a new PendingKernel record appended
229   // to the end of the ring buffer then returns that same count.
230   // Caller is responsible for ensuring that RecordTerminate() is eventually
231   // called with the same counter value.
232   void RecordQueued(uint64 queued_count, int weight)
233       TF_EXCLUSIVE_LOCKS_REQUIRED(mu_);
234 
235   // Takes a count value returned by RecordQueued and finds the corresponding
236   // PendingKernel record in the ring buffer.  Marks the kernel as completed and
237   // advances the completion frontier accordingly.
238   void RecordTerminated(uint64 queued_count);
239 
240   // Returns the largest timing count such that all kernels queued no
241   // later than that count are known to have terminated.
LastTerminatedCount(uint64 old_value)242   inline uint64 LastTerminatedCount(uint64 old_value) {
243     uint64 new_value = last_terminated_count_.load(std::memory_order_relaxed);
244     if (new_value == old_value) {
245       MaybeQueueProgressEvent();
246     }
247     return new_value;
248   }
249 
250   // Returns the number of kernels enqueued that are not yet known to
251   // have terminated.
NumPending()252   int NumPending() {
253     mutex_lock l(mu_);
254     return num_pending_;
255   }
256 
257   // Yield current thread until number of pending kernels no longer
258   // exceeds the cap.
PauseWhilePendingExceeds(int cap)259   void PauseWhilePendingExceeds(int cap) TF_LOCKS_EXCLUDED(mu_) {
260     mutex_lock l(mu_);
261     while (num_pending_ > cap) {
262       VLOG(1) << "num_pending_=" << num_pending_ << " cap=" << cap;
263       pending_decreased_.wait(l);
264     }
265   }
266 
267  private:
268   friend class GPUKernelTrackerTest;
269   Params params_;
270   Env* env_;
271   se::Stream* stream_;
272   SharedCounter* timing_counter_;
273   std::unique_ptr<SharedCounter> owned_counter_;
274   Allocator* allocator_ = nullptr;
275   EventMgr* em_ = nullptr;
276   std::atomic<uint64> last_terminated_count_ = {1};
277 
278   void MaybeQueueProgressEvent();
279 
280   // Records when a kernel was queued for execution.  Kernel launches are
281   // identified by a unique count value from a per-GPU device timing counter.
282   struct PendingKernel {
283     uint64 queued_count;
284     int weight;
285     bool terminated;
PendingKernelPendingKernel286     PendingKernel(const PendingKernel& pk)
287         : queued_count(pk.queued_count),
288           weight(pk.weight),
289           terminated(pk.terminated) {}
PendingKernelPendingKernel290     PendingKernel() : queued_count(0), weight(0), terminated(false) {}
291   };
292   mutex mu_;
293   int32 mem_since_last_ TF_GUARDED_BY(mu_);
294   int32 ops_since_last_ TF_GUARDED_BY(mu_);
295   // Ring buffer of PendingKernel records.
296   std::vector<PendingKernel> pending_kernels_ TF_GUARDED_BY(mu_);
297   // Next unused slot in pending_kernels_.
298   int first_available_ TF_GUARDED_BY(mu_) = 0;
299   // Last completed PendingKernel such that all prior PendingKernels are
300   // also completed.  With out-of-order completion there may be a mixture
301   // of completed and uncompleted entries between last_completed_ and
302   // first_available_.
303   int last_completed_ TF_GUARDED_BY(mu_) = -1;
304   // Sum of weights of the outstanding events marking tracked kernels.
305   int num_pending_ TF_GUARDED_BY(mu_) = 0;
306   condition_variable pending_decreased_ TF_GUARDED_BY(mu_);
307 };
308 
309 class BaseGPUDeviceFactory : public DeviceFactory {
310  public:
311   Status ListPhysicalDevices(std::vector<string>* devices) override;
312   Status CreateDevices(const SessionOptions& options,
313                        const std::string& name_prefix,
314                        std::vector<std::unique_ptr<Device>>* devices) override;
315   Status GetDeviceDetails(int device_index,
316                           std::unordered_map<string, string>* details) override;
317 
318   struct InterconnectMap {
319     // Name of interconnect technology, if known.
320     std::string name;
321     // If possible, strength should approximate Gb/sec bandwidth rate.
322     // Where architecture-specific subclassing is not done that won't
323     // always be possible.  The minimum expectation is that
324     // faster links should have a higher value than slower links.
325     int32 strength;
326     static const int kSameDeviceStrength;
327     static const int kStreamExecutorStrength;
328     std::set<std::pair<PlatformGpuId, PlatformGpuId>> directed_links;
329   };
330 
331  protected:
332   // Populates *maps with interconnect maps for all local direct access
333   // pathways between GPUs.
334   virtual Status GetInterconnectMaps(
335       const std::vector<PlatformGpuId>& visible_gpu_order,
336       se::Platform* gpu_manager, std::vector<InterconnectMap>* maps);
337 
338   struct TfGpuIdHash {
operatorTfGpuIdHash339     std::size_t operator()(const TfGpuId& id) const noexcept {
340       return std::hash<int>{}(id.value());
341     }
342   };
343   typedef std::unordered_map<TfGpuId, DeviceLocality, TfGpuIdHash> LocalityMap;
344   // Populates *localities with the DeviceLocality descriptor for
345   // every TfGpuId.
346   virtual Status GetDeviceLocalities(
347       int num_tf_gpus, const std::vector<InterconnectMap>& interconnects,
348       LocalityMap* localities);
349 
350  private:
351   // Creates a BaseGPUDevice associated with 'tf_gpu_id', allocates (strictly)
352   // 'memory_limit' bytes of GPU memory to it, and adds it to the 'devices'
353   // vector.
354   Status CreateGPUDevice(const SessionOptions& options,
355                          const std::string& name_prefix, TfGpuId tf_gpu_id,
356                          int64 memory_limit, const DeviceLocality& dev_locality,
357                          size_t num_tf_gpus,
358                          std::vector<std::unique_ptr<Device>>* devices);
359 
360   virtual std::unique_ptr<BaseGPUDevice> CreateGPUDevice(
361       const SessionOptions& options, const string& name, Bytes memory_limit,
362       const DeviceLocality& dev_locality, TfGpuId tf_gpu_id,
363       const string& physical_device_desc, Allocator* gpu_allocator,
364       Allocator* cpu_allocator) = 0;
365 
366   Status EnablePeerAccess(const std::vector<PlatformGpuId>& visible_gpu_order);
367 
368   // Returns into 'ids' the list of valid platform GPU ids, in the order that
369   // they should map to TF GPU ids "/device:GPU:0", "/device:GPU:1", etc,
370   // based upon 'visible_gpu_order' which was generated by parsing
371   // GPUOptions::visible_device_list which is a comma-separated list of CUDA or
372   // ROCm GPU ids.
373   Status GetValidDeviceIds(const std::vector<PlatformGpuId>& visible_gpu_order,
374                            std::vector<PlatformGpuId>* ids);
375 
376   // Cache the valid device IDs if not already cached. Cached IDs are stored in
377   // field cached_device_ids_. Passes {0, 1, ..., num_devices-1} to
378   // GetValidDeviceIds, so this should only be used in functions where all
379   // devices should be treated as visible, like ListPhysicalDevices.
380   Status CacheDeviceIds();
381 
382   // visible_gpu_initialized_[platform_gpu_id] is true if visible GPU
383   // platform_gpu_id has been initialized by the process.
384   std::unordered_map<int, bool> visible_gpu_initialized_;
385 
386   // Cached device IDs, as returned by GetValidDeviceIds when every physical
387   // device is visible. Cache should not be used if some devices are not
388   // visible.
389   std::vector<PlatformGpuId> cached_device_ids_;
390 };
391 
392 }  // namespace tensorflow
393 
394 #endif  // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_
395