1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #if !GOOGLE_CUDA && !TENSORFLOW_USE_ROCM 17 #error This file must only be included when building with Cuda or ROCm support 18 #endif 19 20 #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 21 #define TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 22 23 #include <memory> 24 #include <string> 25 #include <unordered_map> 26 #include <vector> 27 28 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor" 29 #include "tensorflow/core/common_runtime/device/device_id_utils.h" 30 #include "tensorflow/core/common_runtime/device_factory.h" 31 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" 32 #include "tensorflow/core/common_runtime/gpu/gpu_id.h" 33 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h" 34 #include "tensorflow/core/common_runtime/gpu_device_context.h" 35 #include "tensorflow/core/common_runtime/local_device.h" 36 #include "tensorflow/core/common_runtime/scoped_allocator_mgr.h" 37 #include "tensorflow/core/common_runtime/shared_counter.h" 38 #include "tensorflow/core/framework/allocator.h" 39 #include "tensorflow/core/framework/device_base.h" 40 #include "tensorflow/core/framework/op_kernel.h" 41 #include "tensorflow/core/framework/tensor.h" 42 #include "tensorflow/core/lib/core/status.h" 43 #include "tensorflow/core/lib/gtl/inlined_vector.h" 44 #include "tensorflow/core/platform/mutex.h" 45 #include "tensorflow/core/platform/stream_executor.h" 46 #include "tensorflow/core/platform/types.h" 47 #include "tensorflow/core/public/session_options.h" 48 49 namespace tensorflow { 50 class GPUKernelTracker; 51 52 class BaseGPUDevice : public LocalDevice { 53 public: 54 BaseGPUDevice(const SessionOptions& options, const std::string& name, 55 Bytes memory_limit, const DeviceLocality& locality, 56 TfGpuId tf_gpu_id, const std::string& physical_device_desc, 57 Allocator* gpu_allocator, Allocator* cpu_allocator, 58 bool sync_every_op); 59 60 ~BaseGPUDevice() override; 61 62 // Initialize the device and return the status of initialization. 63 Status Init(const SessionOptions& options); 64 65 void Compute(OpKernel* op_kernel, OpKernelContext* context) override; 66 67 Status Sync() override; 68 69 void ComputeAsync(AsyncOpKernel* op_kernel, OpKernelContext* context, 70 AsyncOpKernel::DoneCallback done) override; 71 72 Status MakeTensorFromProto(const TensorProto& tensor_proto, 73 const AllocatorAttributes alloc_attrs, 74 Tensor* tensor) override; 75 76 void CopyTensorInSameDevice(const Tensor* input_tensor, Tensor* output_tensor, 77 const DeviceContext* device_context, 78 StatusCallback done) override; 79 80 // The caller owns the returned device. 81 PerOpGpuDevice* MakeGpuDevice() override; 82 83 Status ReinitializeGpuDevice(OpKernelContext* context, PerOpGpuDevice* device, 84 DeviceContext* dc, 85 Allocator* allocator) override; 86 87 // Returns the platform GPU id of this device within the native driver system; 88 // e.g., for CUDA and ROCm this is the ordinal of the GPU within the system. gpu_id()89 int gpu_id() const { 90 PlatformGpuId platform_gpu_id; 91 TF_CHECK_OK(GpuIdManager::TfToPlatformGpuId(tf_gpu_id_, &platform_gpu_id)); 92 return platform_gpu_id.value(); 93 } 94 95 // The executor that provides control for the device; e.g., for CUDA this 96 // corresponds to the cuda context. executor()97 se::StreamExecutor* executor() const { return executor_; } 98 99 Allocator* GetScopedAllocator(AllocatorAttributes attr, 100 int64 step_id) override; 101 GetScopedAllocatorMgr()102 ScopedAllocatorMgr* GetScopedAllocatorMgr() const override { 103 return scoped_allocator_mgr_.get(); 104 } 105 106 // The following two functions always return 0 unless one of the 107 // related experimental config options has been specified. 108 109 // If returned value is > 0 then GPU Memory chunks freed before this count 110 // are guaranteed not to be in use by any kernel pending on this device. 111 uint64 SafeAllocFrontier(uint64 old_value) override; 112 113 // Returns the number of kernels that have been queued for execution on 114 // the compute stream and are not yet known to have completed. 115 int PendingKernels(); 116 priority()117 int priority() const { return stream_->priority; } 118 119 // Helper method for unit tests to reset the streams. Never use in production. 120 static void TestOnlyReset(); 121 122 protected: 123 Allocator* gpu_allocator_; // not owned 124 Allocator* cpu_allocator_; // not owned 125 126 se::StreamExecutor* executor_; // not owned 127 std::unique_ptr<ScopedAllocatorMgr> scoped_allocator_mgr_; 128 129 private: 130 friend class GPUDeviceTestHelper; 131 struct StreamGroup { 132 se::Stream* compute = nullptr; 133 #if TENSORFLOW_USE_ROCM 134 se::Stream* nccl = nullptr; 135 #endif 136 se::Stream* host_to_device = nullptr; 137 se::Stream* device_to_host = nullptr; 138 gtl::InlinedVector<se::Stream*, 4> device_to_device; 139 int priority = 0; 140 }; 141 class StreamGroupFactory; 142 143 StreamGroup* stream_; 144 mutex scratch_init_mutex_; 145 char* scratch_ = nullptr; 146 GPUDeviceContext* device_context_; 147 GpuDeviceInfo* gpu_device_info_ = nullptr; 148 mutex trace_mu_; 149 TfGpuId tf_gpu_id_; 150 const bool sync_every_op_ = false; 151 EventMgr* em_ = nullptr; 152 std::unique_ptr<thread::ThreadPool> thread_pool_; 153 std::unique_ptr<GPUKernelTracker> kernel_tracker_; 154 int32 pending_cap_ = 0; 155 bool timestamped_allocator_ = false; 156 157 // Initialize scratch buffers used by Eigen. 158 Status InitScratchBuffers(); 159 160 void ReinitializeDevice(OpKernelContext* context, PerOpGpuDevice* device, 161 int stream_id, Allocator* allocator); 162 163 std::string ComputeOpKernelDebugString(const OpKernel& op_kernel, 164 const int& stream_id); 165 166 // This method returns an initialization status, in addition to 167 // calling the "done" StatusCallback, if there is a failure to 168 // allocate memory or if the tensor "from" is not DMA-copyable. 169 // If there is no error prior to enqueueing the copy, an OK status 170 // is returned. 171 Status MaybeCopyTensorToGPU(const AllocatorAttributes& alloc_attrs, 172 const Tensor& from, Tensor* to, 173 StatusCallback done); 174 }; 175 176 // A per-compute-stream utility that keeps track of kernels that have been 177 // queued for execution but may not yet have terminated and also the queued 178 // time of the most recently terminated kernel. 179 class GPUKernelTracker { 180 public: 181 // Controls the strategy for inserting tracking events after GPU kernels. 182 // If max_interval >= 0, then insert an event after this many kernels 183 // if an event has not been inserted for another reason. 184 // If max_bytes > 0, then insert an event after kernels allocating this 185 // many bytes have been queued since the last event. 186 // If max_pending > 0, then track up to this many events at once. If 187 // this limit is reached the GPU::Compute() method will delay starting 188 // additional ops until some event completes. If 0 and one of the other 189 // fields is non-zero, then a reasonable default will be selected. 190 struct Params { 191 int max_interval = 0; 192 int max_bytes = 0; 193 int max_pending = 0; ParamsParams194 Params(int mi, int mb, int mp) 195 : max_interval(mi), max_bytes(mb), max_pending(mp) {} 196 }; 197 198 // If we're going to share a SharedCounter with an allocator, it's owned 199 // by the allocator because allocators are initialized once per process. 200 // Devices are per-session. GPUKernelTracker(const Params & params,Env * env,se::Stream * compute_stream,SharedCounter * timing_counter,Allocator * allocator,EventMgr * event_manager)201 explicit GPUKernelTracker(const Params& params, Env* env, 202 se::Stream* compute_stream, 203 SharedCounter* timing_counter, Allocator* allocator, 204 EventMgr* event_manager) 205 : params_(params), 206 env_(env), 207 stream_(compute_stream), 208 timing_counter_(timing_counter), 209 allocator_(allocator), 210 em_(event_manager), 211 pending_kernels_( 212 params.max_pending > 0 ? std::max(8, 2 * params.max_pending) : 64) { 213 mem_since_last_ = 0; 214 if (!timing_counter_) { 215 // There's not a preexisting counter owned by GPUProcessState, i.e. 216 // pending_cap > 0 but timestamped_allocator == false. 217 owned_counter_.reset(new SharedCounter); 218 timing_counter_ = owned_counter_.get(); 219 } 220 } 221 222 // Determine whether a GPU kernel should have a recording event queued 223 // immediately afterwards. If so, advance the counter and return the new 224 // counter value after enqueuing. 225 uint64 MaybeQueue(OpKernelContext* ctx); 226 227 // Record that a GPU kernel has just been enqueued on the compute stream. 228 // Inserts the supplied counter value in a new PendingKernel record appended 229 // to the end of the ring buffer then returns that same count. 230 // Caller is responsible for ensuring that RecordTerminate() is eventually 231 // called with the same counter value. 232 void RecordQueued(uint64 queued_count, int weight) 233 TF_EXCLUSIVE_LOCKS_REQUIRED(mu_); 234 235 // Takes a count value returned by RecordQueued and finds the corresponding 236 // PendingKernel record in the ring buffer. Marks the kernel as completed and 237 // advances the completion frontier accordingly. 238 void RecordTerminated(uint64 queued_count); 239 240 // Returns the largest timing count such that all kernels queued no 241 // later than that count are known to have terminated. LastTerminatedCount(uint64 old_value)242 inline uint64 LastTerminatedCount(uint64 old_value) { 243 uint64 new_value = last_terminated_count_.load(std::memory_order_relaxed); 244 if (new_value == old_value) { 245 MaybeQueueProgressEvent(); 246 } 247 return new_value; 248 } 249 250 // Returns the number of kernels enqueued that are not yet known to 251 // have terminated. NumPending()252 int NumPending() { 253 mutex_lock l(mu_); 254 return num_pending_; 255 } 256 257 // Yield current thread until number of pending kernels no longer 258 // exceeds the cap. PauseWhilePendingExceeds(int cap)259 void PauseWhilePendingExceeds(int cap) TF_LOCKS_EXCLUDED(mu_) { 260 mutex_lock l(mu_); 261 while (num_pending_ > cap) { 262 VLOG(1) << "num_pending_=" << num_pending_ << " cap=" << cap; 263 pending_decreased_.wait(l); 264 } 265 } 266 267 private: 268 friend class GPUKernelTrackerTest; 269 Params params_; 270 Env* env_; 271 se::Stream* stream_; 272 SharedCounter* timing_counter_; 273 std::unique_ptr<SharedCounter> owned_counter_; 274 Allocator* allocator_ = nullptr; 275 EventMgr* em_ = nullptr; 276 std::atomic<uint64> last_terminated_count_ = {1}; 277 278 void MaybeQueueProgressEvent(); 279 280 // Records when a kernel was queued for execution. Kernel launches are 281 // identified by a unique count value from a per-GPU device timing counter. 282 struct PendingKernel { 283 uint64 queued_count; 284 int weight; 285 bool terminated; PendingKernelPendingKernel286 PendingKernel(const PendingKernel& pk) 287 : queued_count(pk.queued_count), 288 weight(pk.weight), 289 terminated(pk.terminated) {} PendingKernelPendingKernel290 PendingKernel() : queued_count(0), weight(0), terminated(false) {} 291 }; 292 mutex mu_; 293 int32 mem_since_last_ TF_GUARDED_BY(mu_); 294 int32 ops_since_last_ TF_GUARDED_BY(mu_); 295 // Ring buffer of PendingKernel records. 296 std::vector<PendingKernel> pending_kernels_ TF_GUARDED_BY(mu_); 297 // Next unused slot in pending_kernels_. 298 int first_available_ TF_GUARDED_BY(mu_) = 0; 299 // Last completed PendingKernel such that all prior PendingKernels are 300 // also completed. With out-of-order completion there may be a mixture 301 // of completed and uncompleted entries between last_completed_ and 302 // first_available_. 303 int last_completed_ TF_GUARDED_BY(mu_) = -1; 304 // Sum of weights of the outstanding events marking tracked kernels. 305 int num_pending_ TF_GUARDED_BY(mu_) = 0; 306 condition_variable pending_decreased_ TF_GUARDED_BY(mu_); 307 }; 308 309 class BaseGPUDeviceFactory : public DeviceFactory { 310 public: 311 Status ListPhysicalDevices(std::vector<string>* devices) override; 312 Status CreateDevices(const SessionOptions& options, 313 const std::string& name_prefix, 314 std::vector<std::unique_ptr<Device>>* devices) override; 315 Status GetDeviceDetails(int device_index, 316 std::unordered_map<string, string>* details) override; 317 318 struct InterconnectMap { 319 // Name of interconnect technology, if known. 320 std::string name; 321 // If possible, strength should approximate Gb/sec bandwidth rate. 322 // Where architecture-specific subclassing is not done that won't 323 // always be possible. The minimum expectation is that 324 // faster links should have a higher value than slower links. 325 int32 strength; 326 static const int kSameDeviceStrength; 327 static const int kStreamExecutorStrength; 328 std::set<std::pair<PlatformGpuId, PlatformGpuId>> directed_links; 329 }; 330 331 protected: 332 // Populates *maps with interconnect maps for all local direct access 333 // pathways between GPUs. 334 virtual Status GetInterconnectMaps( 335 const std::vector<PlatformGpuId>& visible_gpu_order, 336 se::Platform* gpu_manager, std::vector<InterconnectMap>* maps); 337 338 struct TfGpuIdHash { operatorTfGpuIdHash339 std::size_t operator()(const TfGpuId& id) const noexcept { 340 return std::hash<int>{}(id.value()); 341 } 342 }; 343 typedef std::unordered_map<TfGpuId, DeviceLocality, TfGpuIdHash> LocalityMap; 344 // Populates *localities with the DeviceLocality descriptor for 345 // every TfGpuId. 346 virtual Status GetDeviceLocalities( 347 int num_tf_gpus, const std::vector<InterconnectMap>& interconnects, 348 LocalityMap* localities); 349 350 private: 351 // Creates a BaseGPUDevice associated with 'tf_gpu_id', allocates (strictly) 352 // 'memory_limit' bytes of GPU memory to it, and adds it to the 'devices' 353 // vector. 354 Status CreateGPUDevice(const SessionOptions& options, 355 const std::string& name_prefix, TfGpuId tf_gpu_id, 356 int64 memory_limit, const DeviceLocality& dev_locality, 357 size_t num_tf_gpus, 358 std::vector<std::unique_ptr<Device>>* devices); 359 360 virtual std::unique_ptr<BaseGPUDevice> CreateGPUDevice( 361 const SessionOptions& options, const string& name, Bytes memory_limit, 362 const DeviceLocality& dev_locality, TfGpuId tf_gpu_id, 363 const string& physical_device_desc, Allocator* gpu_allocator, 364 Allocator* cpu_allocator) = 0; 365 366 Status EnablePeerAccess(const std::vector<PlatformGpuId>& visible_gpu_order); 367 368 // Returns into 'ids' the list of valid platform GPU ids, in the order that 369 // they should map to TF GPU ids "/device:GPU:0", "/device:GPU:1", etc, 370 // based upon 'visible_gpu_order' which was generated by parsing 371 // GPUOptions::visible_device_list which is a comma-separated list of CUDA or 372 // ROCm GPU ids. 373 Status GetValidDeviceIds(const std::vector<PlatformGpuId>& visible_gpu_order, 374 std::vector<PlatformGpuId>* ids); 375 376 // Cache the valid device IDs if not already cached. Cached IDs are stored in 377 // field cached_device_ids_. Passes {0, 1, ..., num_devices-1} to 378 // GetValidDeviceIds, so this should only be used in functions where all 379 // devices should be treated as visible, like ListPhysicalDevices. 380 Status CacheDeviceIds(); 381 382 // visible_gpu_initialized_[platform_gpu_id] is true if visible GPU 383 // platform_gpu_id has been initialized by the process. 384 std::unordered_map<int, bool> visible_gpu_initialized_; 385 386 // Cached device IDs, as returned by GetValidDeviceIds when every physical 387 // device is visible. Cache should not be used if some devices are not 388 // visible. 389 std::vector<PlatformGpuId> cached_device_ids_; 390 }; 391 392 } // namespace tensorflow 393 394 #endif // TENSORFLOW_CORE_COMMON_RUNTIME_GPU_GPU_DEVICE_H_ 395