1 /* Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 // TODO(opensource): Use a more generic sounding preprocessor name than
17 // GOOGLE_CUDA
18 #if (defined(GOOGLE_CUDA) && GOOGLE_CUDA) || \
19     (defined(TENSORFLOW_USE_ROCM) && TENSORFLOW_USE_ROCM)
20 
21 #if TENSORFLOW_USE_ROCM
22 #include "rocm/include/hip/hip_runtime.h"
23 #endif
24 
25 #define EIGEN_USE_GPU
26 
27 #include <stdlib.h>
28 #include <string.h>
29 
30 #include <algorithm>
31 #include <list>
32 #include <map>
33 #include <tuple>
34 #include <vector>
35 
36 #include "third_party/eigen3/unsupported/Eigen/CXX11/Tensor"
37 #include "tensorflow/core/common_runtime/device/device_event_mgr.h"
38 #include "tensorflow/core/common_runtime/device/device_id_utils.h"
39 #include "tensorflow/core/common_runtime/device_factory.h"
40 #include "tensorflow/core/common_runtime/gpu/gpu_device.h"
41 #include "tensorflow/core/common_runtime/gpu/gpu_id.h"
42 #include "tensorflow/core/common_runtime/gpu/gpu_id_manager.h"
43 #include "tensorflow/core/common_runtime/gpu/gpu_init.h"
44 #include "tensorflow/core/common_runtime/gpu/gpu_process_state.h"
45 #include "tensorflow/core/common_runtime/gpu/gpu_util.h"
46 #include "tensorflow/core/common_runtime/gpu_device_context.h"
47 #include "tensorflow/core/common_runtime/local_device.h"
48 #include "tensorflow/core/framework/allocator.h"
49 #include "tensorflow/core/framework/device_base.h"
50 #include "tensorflow/core/framework/op_kernel.h"
51 #include "tensorflow/core/framework/tensor.h"
52 #include "tensorflow/core/framework/tensor.pb.h"
53 #include "tensorflow/core/framework/types.h"
54 #include "tensorflow/core/framework/variant_op_registry.h"
55 #include "tensorflow/core/graph/types.h"
56 #include "tensorflow/core/lib/core/errors.h"
57 #include "tensorflow/core/lib/core/status.h"
58 #include "tensorflow/core/lib/strings/numbers.h"
59 #include "tensorflow/core/lib/strings/str_util.h"
60 #include "tensorflow/core/lib/strings/strcat.h"
61 #if GOOGLE_CUDA
62 #include "third_party/gpus/cudnn/cudnn.h"
63 #include "tensorflow/stream_executor/cuda/cuda_activation.h"
64 #elif TENSORFLOW_USE_ROCM
65 #include "tensorflow/core/platform/rocm.h"
66 #endif
67 #include "tensorflow/core/platform/logging.h"
68 #include "tensorflow/core/platform/macros.h"
69 #include "tensorflow/core/platform/stream_executor.h"
70 #include "tensorflow/core/platform/types.h"
71 #include "tensorflow/core/profiler/lib/scoped_annotation.h"
72 #include "tensorflow/core/public/session_options.h"
73 #include "tensorflow/core/util/device_name_utils.h"
74 #include "tensorflow/core/util/env_var.h"
75 #include "tensorflow/core/util/stream_executor_util.h"
76 #include "tensorflow/stream_executor/gpu/gpu_stream.h"
77 #include "tensorflow/stream_executor/platform/dso_loader.h"
78 
79 #if !defined(PLATFORM_GOOGLE)
80 #if GOOGLE_CUDA
81 #include "third_party/gpus/cuda/cuda_config.h"
82 #endif
83 #endif
84 
85 namespace tensorflow {
86 
87 #if GOOGLE_CUDA
88 
89 typedef cudaStream_t gpuStream_t;
90 typedef cudaDeviceProp gpuDeviceProp_t;
91 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
92 using se::cuda::ScopedActivateExecutorContext;
93 
94 #elif TENSORFLOW_USE_ROCM
95 
96 typedef hipStream_t gpuStream_t;
97 typedef hipDeviceProp_t gpuDeviceProp_t;
98 #define EIGEN_GPU_SCRATCH_SIZE (Eigen::kGpuScratchSize)
99 using se::rocm::ScopedActivateExecutorContext;
100 
101 #endif
102 
103 // Eigen Ops directly allocate memory only for temporary buffers used
104 // during OpKernel::Compute().  The recommended way of allocating such
105 // memory is via OpKernelContext::allocate_temp().  However, Eigen Ops
106 // don't have access to OpKernelContext, instead they get access to
107 // memory directly through the device allocator.  As an Open Source
108 // project, Eigen assumes allocator semantics similar to those of the
109 // CUDA or ROCm memory allocator, and may not work correctly due to race
110 // conditions if used with some other allocator.  For safety, we need
111 // to delay deallocation calls out of Eigen until all events on the
112 // corresponding stream have completed.  The following two classes
113 // serve this purpose in two different compilation environments.
114 
115 class EigenGpuStreamDevice : public ::Eigen::StreamInterface {
116  public:
EigenGpuStreamDevice()117   EigenGpuStreamDevice()
118       : scratch_(nullptr), semaphore_(nullptr), context_(nullptr) {
119     Eigen::initializeDeviceProp();
120   }
~EigenGpuStreamDevice()121   ~EigenGpuStreamDevice() override {}
Reinitialize(OpKernelContext * context,const gpuStream_t * gpu_stream,TfGpuId tf_gpu_id,::tensorflow::Allocator * alloc,char * scratch)122   void Reinitialize(OpKernelContext* context, const gpuStream_t* gpu_stream,
123                     TfGpuId tf_gpu_id, ::tensorflow::Allocator* alloc,
124                     char* scratch) {
125     if (LogMemory::IsEnabled()) {
126       operation_ = context->op_kernel().name() + "/EigenAllocator";
127       step_id_ = context->step_id();
128     }
129     context_ = context;
130     scratch_ = scratch;
131     semaphore_ =
132         reinterpret_cast<unsigned int*>(scratch + Eigen::kGpuScratchSize);
133     stream_ = gpu_stream;
134     allocator_ = alloc;
135     PlatformGpuId platform_gpu_id;
136     TF_CHECK_OK(GpuIdManager::TfToPlatformGpuId(tf_gpu_id, &platform_gpu_id));
137     device_prop_ = &Eigen::m_deviceProperties[platform_gpu_id.value()];
138   }
139 
stream() const140   const gpuStream_t& stream() const override { return *stream_; }
deviceProperties() const141   const gpuDeviceProp_t& deviceProperties() const override {
142     return *device_prop_;
143   }
144 
allocate(size_t num_bytes) const145   void* allocate(size_t num_bytes) const override {
146     void* ret = allocator_->AllocateRaw(32 /* alignment */, num_bytes);
147     if (ret == nullptr) {
148       if (context_) {
149         context_->SetStatus(errors::ResourceExhausted(
150             strings::StrCat("Ran out of GPU memory when allocating ", num_bytes,
151                             " bytes for ", operation_)));
152       } else {
153         LOG(FATAL)
154             << "EigenAllocator for GPU ran out of memory when allocating "
155             << num_bytes << ". See error logs for more detailed info.";
156       }
157     }
158     if (LogMemory::IsEnabled() && ret != nullptr) {
159       LogMemory::RecordRawAllocation(operation_, step_id_, num_bytes, ret,
160                                      allocator_);
161     }
162     return ret;
163   }
deallocate(void * buffer) const164   void deallocate(void* buffer) const override {
165     if (LogMemory::IsEnabled() && buffer != nullptr) {
166       LogMemory::RecordRawDeallocation(operation_, step_id_, buffer, allocator_,
167                                        true);
168     }
169     AsyncFreeData* afData =
170         new AsyncFreeData(allocator_, buffer, operation_, step_id_);
171 #if GOOGLE_CUDA
172     cudaError_t err = cudaStreamAddCallback(*stream_, asyncFree, afData, 0);
173     CHECK_EQ(err, cudaSuccess);
174 #elif TENSORFLOW_USE_ROCM
175     hipError_t err = hipStreamAddCallback(*stream_, asyncFree, afData, 0);
176     CHECK_EQ(err, hipSuccess);
177 #endif
178   }
179 
180   // Return a pointer to a per stream scratchpad of 1024 bytes residing
181   // in global memory.
scratchpad() const182   void* scratchpad() const override { return scratch_; }
183 
184   // Return a semaphore. The semaphore is initially initialized to 0, and
185   // each kernel using it is responsible for resetting to 0 upon completion
186   // to maintain the invariant that the semaphore is always equal to 0 upon
187   // each kernel start.
semaphore() const188   unsigned int* semaphore() const override { return semaphore_; }
189 
190  private:
191   struct AsyncFreeData {
AsyncFreeDatatensorflow::EigenGpuStreamDevice::AsyncFreeData192     AsyncFreeData(::tensorflow::Allocator* a, void* p, const string& o,
193                   const int64 s)
194         : allocator_(a), address_(p), operation_(o), step_id_(s) {}
195     ::tensorflow::Allocator* allocator_;
196     void* address_;
197     const string operation_;
198     const int64 step_id_;
199   };
200 
201 #if GOOGLE_CUDA
asyncFree(gpuStream_t stream,cudaError_t status,void * userData)202   static void CUDART_CB asyncFree(gpuStream_t stream, cudaError_t status,
203                                   void* userData) {
204 #elif TENSORFLOW_USE_ROCM
205   static void asyncFree(gpuStream_t stream, hipError_t status, void* userData) {
206 #endif
207     AsyncFreeData* data = static_cast<AsyncFreeData*>(userData);
208     if (LogMemory::IsEnabled()) {
209       LogMemory::RecordRawDeallocation(data->operation_, data->step_id_,
210                                        data->address_, data->allocator_, false);
211     }
212     data->allocator_->DeallocateRaw(data->address_);
213     delete data;
214   }
215 
216   string operation_;
217   int64 step_id_;
218   const gpuStream_t* stream_;           // Not owned.
219   const gpuDeviceProp_t* device_prop_;  // Not owned.
220   ::tensorflow::Allocator* allocator_;  // Not owned.
221   mutable char* scratch_;
222   mutable unsigned int* semaphore_;
223   OpKernelContext* context_;
224 
225   TF_DISALLOW_COPY_AND_ASSIGN(EigenGpuStreamDevice);
226 };
227 
228 // This factory helps to ensure that different GPU device objects that refer to
229 // the same physical device and stream group id use the same stream group
230 // object (and therefore the same CUDA streams). This is necessary since there
231 // is a single memory allocator per device (see ProcessState::GetGPUAllocator)
232 // and allocators must not be shared across streams.
233 class BaseGPUDevice::StreamGroupFactory {
234  public:
235   // Returns the unique stream group for use with the stream defined by
236   // {tf_gpu_id, stream_group_within_gpu}, creating it if it does not yet
237   // exist.
238   // This function is thread safe.
GetOrCreate(TfGpuId tf_gpu_id,int stream_group_within_gpu,se::StreamExecutor * executor,const GPUOptions & options)239   BaseGPUDevice::StreamGroup* GetOrCreate(TfGpuId tf_gpu_id,
240                                           int stream_group_within_gpu,
241                                           se::StreamExecutor* executor,
242                                           const GPUOptions& options) {
243     mutex_lock guard(lock_);
244     StreamGroup* group =
245         &streams_[key_type(tf_gpu_id.value(), stream_group_within_gpu)];
246     if (!group->compute) {
247       int priority = GetPriority(tf_gpu_id.value(), options);
248       group->priority = priority;
249       group->compute = GetStream(executor, priority);
250       group->compute->Init();
251       VLOG(2) << "Created stream[" << stream_group_within_gpu
252               << "] = " << group->compute << " with priority: " << priority;
253 
254 #if TENSORFLOW_USE_ROCM
255       // ROCm streams are lightweight and will not necessarily trigger device
256       // queue init until they are first used. For optimal performance,
257       // compute and nccl streams must be immediate siblings.
258       group->nccl = GetStream(executor, priority);
259       group->nccl->Init();
260       VLOG(2) << "Created nccl_stream[" << stream_group_within_gpu
261               << "] = " << group->nccl;
262 
263       // Force underlying resource creation now.
264       group->compute->ThenWaitFor(group->nccl);
265       group->nccl->ThenWaitFor(group->compute);
266 #endif
267 
268       group->host_to_device = GetStream(executor, priority);
269       group->host_to_device->Init();
270       VLOG(2) << "Created host_to_device_stream[" << stream_group_within_gpu
271               << "] = " << group->host_to_device;
272 
273       group->device_to_host = GetStream(executor, priority);
274       group->device_to_host->Init();
275       VLOG(2) << "Created device_to_host_stream[" << stream_group_within_gpu
276               << "] = " << group->device_to_host;
277 
278       int num_d2d_streams =
279           options.experimental().num_dev_to_dev_copy_streams();
280       if (num_d2d_streams == 0) num_d2d_streams = 1;
281       if (num_d2d_streams < 1 || num_d2d_streams > 4) {
282         LOG(ERROR)
283             << "Illegal GPUOptions.experimental.num_dev_to_dev_copy_streams="
284             << num_d2d_streams << " set to 1 instead.";
285         num_d2d_streams = 1;
286       }
287       for (int i = 0; i < num_d2d_streams; ++i) {
288         se::Stream* stream = GetStream(executor, priority);
289         stream->Init();
290         group->device_to_device.push_back(stream);
291         VLOG(2) << "Created device_to_device_stream[" << stream_group_within_gpu
292                 << "] = " << group->device_to_device.back();
293       }
294     }
295     return group;
296   }
297 
298   // Returns a reference to the StreamGroupFactory singleton. Note that this is
299   // never destroyed, so the objects it owns are never deleted.
Global()300   static StreamGroupFactory& Global() {
301     static StreamGroupFactory* instance = new StreamGroupFactory();
302     return *instance;
303   }
304 
305   // Helper method for unit tests to reset the streams. Never use in production.
TestOnlyReset()306   void TestOnlyReset() {
307     mutex_lock guard(lock_);
308     for (auto& item : streams_) {
309       auto& stream = item.second;
310       if (stream.compute) {
311         delete stream.compute;
312         stream.compute = nullptr;
313       }
314 #if TENSORFLOW_USE_ROCM
315       if (stream.nccl) {
316         delete stream.nccl;
317         stream.nccl = nullptr;
318       }
319 #endif
320       if (stream.host_to_device) {
321         delete stream.host_to_device;
322         stream.host_to_device = nullptr;
323       }
324       if (stream.device_to_host) {
325         delete stream.device_to_host;
326         stream.device_to_host = nullptr;
327       }
328       while (!stream.device_to_device.empty()) {
329         auto back = stream.device_to_device.back();
330         if (back) {
331           delete back;
332         }
333         stream.device_to_device.pop_back();
334       }
335     }
336     streams_.clear();
337   }
338 
339  private:
340   // Returns priority for the given virtual GPU id from the session options.
341   // Returns 0 if no virtual devices are specified.
GetPriority(int tf_gpu_id,const GPUOptions & options)342   int GetPriority(int tf_gpu_id, const GPUOptions& options) {
343     int id = tf_gpu_id;
344     int i = 0;
345     int priority = 0;
346     while (i < options.experimental().virtual_devices_size()) {
347       const int size =
348           options.experimental().virtual_devices().Get(i).priority_size();
349       if (id >= size) {
350         id -= size;
351       } else {
352         priority =
353             options.experimental().virtual_devices().Get(i).priority().Get(id);
354         break;
355       }
356       i++;
357     }
358     return priority;
359   }
360 
361   // Returns a Stream with the underlying GPUStream with the given priority.
GetStream(se::StreamExecutor * executor,int priority)362   se::Stream* GetStream(se::StreamExecutor* executor, int priority) {
363     auto stream = new se::Stream(executor);
364     static_cast<stream_executor::gpu::GpuStream*>(stream->implementation())
365         ->SetPriority(priority);
366     return stream;
367   }
368 
369   mutex lock_;
370   using key_type = std::tuple<int, int>;
371   std::map<key_type, StreamGroup> streams_;
372 
373   // StreamGroupFactory cannot be created directly; Call
374   // StreamGroupFactory::Global() to get the global instance.
375   StreamGroupFactory() = default;
376   TF_DISALLOW_COPY_AND_ASSIGN(StreamGroupFactory);
377 };
378 
BaseGPUDevice(const SessionOptions & options,const string & name,Bytes memory_limit,const DeviceLocality & locality,TfGpuId tf_gpu_id,const string & physical_device_desc,Allocator * gpu_allocator,Allocator * cpu_allocator,bool sync_every_op)379 BaseGPUDevice::BaseGPUDevice(const SessionOptions& options, const string& name,
380                              Bytes memory_limit, const DeviceLocality& locality,
381                              TfGpuId tf_gpu_id,
382                              const string& physical_device_desc,
383                              Allocator* gpu_allocator, Allocator* cpu_allocator,
384                              bool sync_every_op)
385     : LocalDevice(options, Device::BuildDeviceAttributes(name, DEVICE_GPU,
386                                                          memory_limit, locality,
387                                                          physical_device_desc)),
388       gpu_allocator_(gpu_allocator),
389       cpu_allocator_(cpu_allocator),
390       scoped_allocator_mgr_(new ScopedAllocatorMgr(name)),
391       tf_gpu_id_(tf_gpu_id),
392       sync_every_op_(sync_every_op) {
393   GPUProcessState::singleton()->EnableGPUDevice();
394 }
395 
~BaseGPUDevice()396 BaseGPUDevice::~BaseGPUDevice() {
397   delete gpu_device_info_;
398   gpu_allocator_->DeallocateRaw(scratch_);
399   device_context_->Unref();
400 }
401 
402 // This should be idempotent if already initialized.
InitScratchBuffers()403 Status BaseGPUDevice::InitScratchBuffers() {
404   mutex_lock l(scratch_init_mutex_);
405   if (!scratch_) {
406     DCHECK(stream_);
407     size_t scratch_buffer_size = Eigen::kGpuScratchSize + sizeof(unsigned int);
408     ScopedMemoryDebugAnnotation op_annotation("ScratchBuffer");
409     void* scratch_buffer = gpu_allocator_->AllocateRaw(
410         Allocator::kAllocatorAlignment, scratch_buffer_size);
411     if (scratch_buffer == nullptr) {
412       return errors::FailedPrecondition(
413           "Failed to allocate scratch buffer for device ", tf_gpu_id_.value());
414     }
415     se::DeviceMemory<char> mem(
416         se::DeviceMemoryBase(scratch_buffer, scratch_buffer_size));
417     TF_RETURN_IF_ERROR(executor_->SynchronousMemZero(
418         &mem, Eigen::kGpuScratchSize + sizeof(unsigned int)));
419     scratch_ = static_cast<char*>(scratch_buffer);
420   }
421   return Status::OK();
422 }
423 
Init(const SessionOptions & options)424 Status BaseGPUDevice::Init(const SessionOptions& options) {
425   auto executor_status = DeviceIdUtil::ExecutorForTfDeviceId(
426       DEVICE_GPU, GPUMachineManager(), tf_gpu_id_);
427   if (!executor_status.status().ok()) {
428     return errors::Internal("Failed to get StreamExecutor for device ",
429                             tf_gpu_id_.value());
430   }
431 
432   executor_ = executor_status.ValueOrDie();
433 
434   stream_ = StreamGroupFactory::Global().GetOrCreate(
435       tf_gpu_id_, 0, executor_, options.config.gpu_options());
436   device_context_ =
437       new GPUDeviceContext(0, stream_->compute,
438 #if TENSORFLOW_USE_ROCM
439                            stream_->nccl,
440 #endif
441                            stream_->host_to_device, stream_->device_to_host,
442                            stream_->device_to_device);
443 
444   em_ = EventMgrFactory::Singleton()->GetEventMgr(executor_,
445                                                   options.config.gpu_options());
446 
447   GPUKernelTracker::Params tracker_params(
448       options.config.gpu_options().experimental().kernel_tracker_max_interval(),
449       options.config.gpu_options().experimental().kernel_tracker_max_bytes(),
450       options.config.gpu_options().experimental().kernel_tracker_max_pending());
451   timestamped_allocator_ =
452       options.config.gpu_options().experimental().timestamped_allocator();
453   pending_cap_ = tracker_params.max_pending;
454   if (timestamped_allocator_ ||
455       (tracker_params.max_interval > 0 || tracker_params.max_bytes > 0 ||
456        tracker_params.max_pending > 0)) {
457     SharedCounter* timing_counter = nullptr;
458     if (timestamped_allocator_) {
459       // In this case the SharedCounter was already created and set in the
460       // associated Allocator, with ownership by GPUProcessState.
461       // The GPUKernelTracker will use this SharedCounter, instead of
462       // owning its own.
463       timing_counter =
464           GPUProcessState::singleton()->GPUAllocatorCounter(tf_gpu_id_);
465       DCHECK(timing_counter);
466     }
467     kernel_tracker_.reset(new GPUKernelTracker(
468         tracker_params, Env::Default(), stream_->compute, timing_counter,
469         timestamped_allocator_ ? gpu_allocator_ : nullptr, em_));
470   }
471 
472   gpu_device_info_ = new GpuDeviceInfo;
473   gpu_device_info_->stream = stream_->compute;
474   gpu_device_info_->default_context = device_context_;
475   gpu_device_info_->event_mgr = em_;
476   PlatformGpuId platform_gpu_id;
477   TF_RETURN_IF_ERROR(
478       GpuIdManager::TfToPlatformGpuId(tf_gpu_id_, &platform_gpu_id));
479   gpu_device_info_->gpu_id = platform_gpu_id.value();
480   set_tensorflow_gpu_device_info(gpu_device_info_);
481 
482   // Whether and how the GPU device uses its own threadpool.
483   // This option is experimental. Once we confirm the best setting, we
484   // may change the default behavior and completely remove this flag.
485   // Default values might change in future releases.
486   // Possible values:
487   //   * global: GPU uses threads shared with CPU in the main compute
488   //          thread-pool. This is currently the default.
489   //   * gpu_private: GPU uses threads dedicated to this device.
490   //   * gpu_shared: All GPUs share a dedicated thread pool.
491   string gpu_thread_mode;
492   TF_RETURN_IF_ERROR(
493       ReadStringFromEnvVar("TF_GPU_THREAD_MODE", "global", &gpu_thread_mode));
494   gpu_thread_mode = absl::AsciiStrToLower(gpu_thread_mode);
495   if (gpu_thread_mode != "global") {
496     int64 gpu_thread_count = -1;
497     // Default to two threads. One for device compute and another for memory
498     // copies.
499     TF_RETURN_IF_ERROR(
500         ReadInt64FromEnvVar("TF_GPU_THREAD_COUNT", 2, &gpu_thread_count));
501     if (gpu_thread_mode == "gpu_private") {
502       // TODO(zhengxq): since these threads only serve a single GPU device,
503       //   we should set the device context once for each thread, and avoid
504       //   setting them for each kernel.
505       // TODO(zhengxq): pin the thread to the same socket of the target GPU.
506       thread_pool_.reset(new thread::ThreadPool(
507           options.env, ThreadOptions(),
508           strings::StrCat("gpu_private_", tf_gpu_id_.value()),
509           static_cast<int32>(gpu_thread_count),
510           !options.config.experimental().disable_thread_spinning(),
511           /*allocator=*/nullptr));
512       set_tensorflow_device_thread_pool(thread_pool_.get());
513     } else if (gpu_thread_mode == "gpu_shared") {
514       static thread::ThreadPool* thread_pool = new thread::ThreadPool(
515           options.env, ThreadOptions(), "gpu_shared",
516           static_cast<int32>(gpu_thread_count),
517           !options.config.experimental().disable_thread_spinning(),
518           /*allocator=*/nullptr);
519       set_tensorflow_device_thread_pool(thread_pool);
520     } else {
521       string error_message =
522           strings::StrCat("Invalid gpu_thread_mode: ", gpu_thread_mode);
523       LOG(WARNING) << error_message;
524       return errors::InvalidArgument(error_message);
525     }
526   }
527 
528   return Status::OK();
529 }
530 
ComputeOpKernelDebugString(const OpKernel & op_kernel,const int & stream_id)531 string BaseGPUDevice::ComputeOpKernelDebugString(const OpKernel& op_kernel,
532                                                  const int& stream_id) {
533   return strings::StrCat(op_kernel.name(), " op ", op_kernel.type_string(),
534                          " on GPU ", tf_gpu_id_.value(), " stream[", stream_id,
535                          "]");
536 }
537 
Compute(OpKernel * op_kernel,OpKernelContext * context)538 void BaseGPUDevice::Compute(OpKernel* op_kernel, OpKernelContext* context) {
539   // NOTE(tucker): We need to discriminate between Eigen GPU
540   // operations and all others.  If an operation is Eigen
541   // implemented (or otherwise tries to launch a GPU kernel
542   // directly), we need to establish a stacked-scoped environment
543   // that directs it to execute on the proper device.  Otherwise we
544   // expect the Op to use StreamExecutor directly and correctly.
545   GPUDeviceContext* gpu_device_context = device_context_;
546   if (context->op_device_context() != nullptr) {
547     gpu_device_context =
548         static_cast<GPUDeviceContext*>(context->op_device_context());
549   }
550   se::Stream* stream = gpu_device_context->stream();
551   const auto stream_id = gpu_device_context->stream_id();
552 
553   const bool vlog_1 = VLOG_IS_ON(1);
554 
555   if (vlog_1) {
556     VLOG(1) << "GpuDevice::ComputeHelper "
557             << ComputeOpKernelDebugString(*op_kernel, stream_id);
558   }
559 
560   if (kernel_tracker_.get()) {
561     context->set_record_memory_consumption(true);
562     if (pending_cap_ > 0) {
563       kernel_tracker_->PauseWhilePendingExceeds(pending_cap_);
564     }
565   }
566   ScopedActivateExecutorContext scoped_activation{stream->parent()};
567   ScopedMemoryDebugAnnotation op_annotation(op_kernel->name_view().data(),
568                                             context->step_id());
569   op_kernel->Compute(context);
570   if (context->status().ok()) {
571     if (sync_every_op_) {
572       // Note: GPUUtil::Sync() only syncs the default stream.
573       // We need to either sync the stream used by this op, or
574       // all streams.  Given that this flag is typically used for
575       // debugging it makes more sense to sync all GPU activity.
576       context->SetStatus(GPUUtil::SyncAll(this));
577       if (vlog_1) {
578         VLOG(1) << "GpuDevice::ComputeHelper finished "
579                 << ComputeOpKernelDebugString(*op_kernel, stream_id);
580       }
581     } else if (vlog_1) {
582       VLOG(1) << "GpuDevice::ComputeHelper scheduled "
583               << ComputeOpKernelDebugString(*op_kernel, stream_id);
584     }
585     if (kernel_tracker_) {
586       GPUKernelTracker* tracker = kernel_tracker_.get();
587       DCHECK(tracker);
588       uint64 queued_count = tracker->MaybeQueue(context);
589       if (queued_count > 0) {
590         em_->ThenExecute(stream, [tracker, queued_count]() {
591           tracker->RecordTerminated(queued_count);
592         });
593       }
594     }
595   } else {
596     if (vlog_1) {
597       VLOG(1) << "GpuDevice::ComputeHelper failed to schedule "
598               << ComputeOpKernelDebugString(*op_kernel, stream_id);
599     }
600   }
601 }
602 
Sync()603 Status BaseGPUDevice::Sync() {
604   DCHECK_NE(stream_, nullptr);
605 
606   // Device::Sync is supposed to block until all operations queued on the device
607   // at the time of the call have completed.  On GPUs, only operations enqueued
608   // on the compute stream can remain pending after the (Async)OpKernel that
609   // enqueued the operation has completed.  We do use other streams for copies
610   // and collectives, but in those cases the (Async)OpKernels themselves block
611   // until the queued operation has finished.
612   return stream_->compute->BlockHostUntilDone();
613 }
614 
ComputeAsync(AsyncOpKernel * op_kernel,OpKernelContext * context,AsyncOpKernel::DoneCallback done)615 void BaseGPUDevice::ComputeAsync(AsyncOpKernel* op_kernel,
616                                  OpKernelContext* context,
617                                  AsyncOpKernel::DoneCallback done) {
618   GPUDeviceContext* gpu_device_context = device_context_;
619   if (context->op_device_context() != nullptr) {
620     gpu_device_context =
621         static_cast<GPUDeviceContext*>(context->op_device_context());
622   }
623   se::Stream* stream = gpu_device_context->stream();
624   const auto stream_id = gpu_device_context->stream_id();
625 
626   VLOG(1) << "GpuDevice::ComputeAsync " << op_kernel->name() << " op "
627           << op_kernel->type_string() << " on GPU" << tf_gpu_id_ << " stream["
628           << stream_id << "]";
629 
630   ScopedActivateExecutorContext scoped_activation{stream->parent()};
631   op_kernel->ComputeAsync(context, std::move(done));
632 }
633 
MaybeCopyTensorToGPU(const AllocatorAttributes & alloc_attrs,const Tensor & from,Tensor * to,StatusCallback done)634 Status BaseGPUDevice::MaybeCopyTensorToGPU(
635     const AllocatorAttributes& alloc_attrs, const Tensor& from, Tensor* to,
636     StatusCallback done) {
637   if (alloc_attrs.on_host()) {
638     *to = from;
639     done(Status::OK());
640     return Status::OK();
641   } else {
642     if (!DMAHelper::CanUseDMA(&from)) {
643       Status err = errors::Internal("GPU copy from non-DMA ",
644                                     DataTypeString(from.dtype()), " tensor");
645       done(err);
646       return err;
647     }
648     AllocationAttributes allocation_attr;
649     uint64 safe_alloc_frontier = 0;
650     std::function<uint64()> freed_by_func = [this, &safe_alloc_frontier]() {
651       safe_alloc_frontier = SafeAllocFrontier(safe_alloc_frontier);
652       return safe_alloc_frontier;
653     };
654     if (timestamped_allocator_) {
655       allocation_attr.freed_by_func = &freed_by_func;
656     }
657     auto* copy = new Tensor(GetAllocator(alloc_attrs), from.dtype(),
658                             from.shape(), allocation_attr);
659 
660     // If the tensor is not initialized, we likely ran out of memory.
661     if (!copy->IsInitialized()) {
662       delete copy;
663       Status err = errors::ResourceExhausted(
664           "OOM when allocating tensor of shape ", from.shape().DebugString(),
665           " and type ", DataTypeString(from.dtype()));
666       done(err);
667       return err;
668     }
669 
670     auto wrapped_done = [to, copy, done = std::move(done)](const Status& s) {
671       if (s.ok()) {
672         *to = std::move(*copy);
673       }
674       delete copy;
675       done(s);
676     };
677 
678     profiler::ScopedAnnotation annotation("MakeTensorFromProto");
679     device_context_->CopyCPUTensorToDevice(
680         &from, this, copy, std::move(wrapped_done),
681         !timestamped_allocator_ /*sync_dst_compute*/);
682     return Status::OK();
683   }
684 }
685 
MakeTensorFromProto(const TensorProto & tensor_proto,const AllocatorAttributes alloc_attrs,Tensor * tensor)686 Status BaseGPUDevice::MakeTensorFromProto(const TensorProto& tensor_proto,
687                                           const AllocatorAttributes alloc_attrs,
688                                           Tensor* tensor) {
689   AllocatorAttributes attr;
690   attr.set_on_host(true);
691   attr.set_gpu_compatible(true);
692   Allocator* host_alloc = GetAllocator(attr);
693   Tensor parsed(tensor_proto.dtype());
694   if (!parsed.FromProto(host_alloc, tensor_proto)) {
695     return errors::InvalidArgument("Cannot parse tensor from proto: ",
696                                    tensor_proto.DebugString());
697   }
698 
699   ScopedMemoryDebugAnnotation op_annotation("MakeTensorFromProto", "dynamic",
700                                             parsed.dtype(), &parsed.shape());
701   if (parsed.dtype() == DT_VARIANT) {
702     const Variant* from = parsed.flat<Variant>().data();
703     int numa_node = attributes().locality().numa_node();
704     Tensor copy(cpu_allocator(numa_node), DT_VARIANT, parsed.shape());
705     Variant* copy_variant = copy.flat<Variant>().data();
706 
707     std::list<Notification> notifications;
708     Status copy_status;
709     auto copier = [this, &alloc_attrs, &notifications, &copy_status](
710                       const Tensor& from, Tensor* to) {
711       // Copier isn't run in a multithreaded environment, so we don't
712       // have to worry about the notifications list being modified in parallel.
713       notifications.emplace_back();
714       Notification& n = *notifications.rbegin();
715       return MaybeCopyTensorToGPU(alloc_attrs, from, to,
716                                   [&n, &copy_status](const Status& s) {
717                                     if (copy_status.ok()) {
718                                       copy_status.Update(s);
719                                     }
720                                     n.Notify();
721                                   });
722     };
723     Status s;
724     for (int64 ix = 0; ix < parsed.NumElements(); ++ix) {
725       s = VariantDeviceCopy(VariantDeviceCopyDirection::HOST_TO_DEVICE,
726                             from[ix], &copy_variant[ix], copier);
727       if (!s.ok()) {
728         break;
729       }
730     }
731     for (auto& n : notifications) {
732       n.WaitForNotification();
733     }
734     if (!s.ok()) {
735       return s;
736     }
737     *tensor = std::move(copy);
738     return copy_status;
739   } else {
740     Notification n;
741     Status status;
742     TF_RETURN_IF_ERROR(MaybeCopyTensorToGPU(alloc_attrs, parsed, tensor,
743                                             [&n, &status](const Status& s) {
744                                               status = s;
745                                               n.Notify();
746                                             }));
747     n.WaitForNotification();
748     return status;
749   }
750 }
751 
CopyTensorInSameDevice(const Tensor * input_tensor,Tensor * output_tensor,const DeviceContext * device_context,StatusCallback done)752 void BaseGPUDevice::CopyTensorInSameDevice(const Tensor* input_tensor,
753                                            Tensor* output_tensor,
754                                            const DeviceContext* device_context,
755                                            StatusCallback done) {
756   GPUUtil::CopyGPUTensorToSameGPU(static_cast<Device*>(this), device_context,
757                                   input_tensor, output_tensor, std::move(done));
758 }
759 
760 namespace {
761 class ConcretePerOpGpuDevice : public PerOpGpuDevice {
762  public:
ConcretePerOpGpuDevice()763   ConcretePerOpGpuDevice() : device_(&stream_device_) {}
764 
Reinitialize(OpKernelContext * context,const gpuStream_t * gpu_stream,TfGpuId tf_gpu_id,Allocator * base_allocator,char * scratch)765   void Reinitialize(OpKernelContext* context, const gpuStream_t* gpu_stream,
766                     TfGpuId tf_gpu_id, Allocator* base_allocator,
767                     char* scratch) {
768     stream_device_.Reinitialize(context, gpu_stream, tf_gpu_id, base_allocator,
769                                 scratch);
770   }
771 
device() const772   const Eigen::GpuDevice& device() const override { return device_; }
773 
774  private:
775   EigenGpuStreamDevice stream_device_;
776   Eigen::GpuDevice device_;
777 };
778 
779 // Parse 'visible_device_list' into a list of platform GPU ids.
ParseVisibleDeviceList(const string & visible_device_list,std::vector<PlatformGpuId> * visible_gpu_order)780 Status ParseVisibleDeviceList(const string& visible_device_list,
781                               std::vector<PlatformGpuId>* visible_gpu_order) {
782   visible_gpu_order->clear();
783   se::Platform* gpu_manager = GPUMachineManager();
784 
785   // If the user wants to remap the visible to virtual GPU mapping,
786   // check for that here.
787   if (visible_device_list.empty()) {
788     visible_gpu_order->resize(gpu_manager->VisibleDeviceCount());
789     // By default, visible to virtual mapping is unchanged.
790     int deviceNo = 0;
791     std::generate(visible_gpu_order->begin(), visible_gpu_order->end(),
792                   [&deviceNo] { return deviceNo++; });
793   } else {
794     const std::vector<string> order_str =
795         str_util::Split(visible_device_list, ',');
796     for (const string& platform_gpu_id_str : order_str) {
797       int32 platform_gpu_id;
798       if (!strings::safe_strto32(platform_gpu_id_str, &platform_gpu_id)) {
799         return errors::InvalidArgument(
800             "Could not parse entry in 'visible_device_list': '",
801             platform_gpu_id_str,
802             "'. visible_device_list = ", visible_device_list);
803       }
804       if (platform_gpu_id < 0 ||
805           platform_gpu_id >= gpu_manager->VisibleDeviceCount()) {
806         return errors::InvalidArgument(
807             "'visible_device_list' listed an invalid GPU id '", platform_gpu_id,
808             "' but visible device count is ",
809             gpu_manager->VisibleDeviceCount());
810       }
811       visible_gpu_order->push_back(PlatformGpuId(platform_gpu_id));
812     }
813   }
814 
815   // Validate no repeats.
816   std::set<PlatformGpuId> visible_device_set(visible_gpu_order->begin(),
817                                              visible_gpu_order->end());
818   if (visible_device_set.size() != visible_gpu_order->size()) {
819     return errors::InvalidArgument(
820         "visible_device_list contained a duplicate entry: ",
821         visible_device_list);
822   }
823   return Status::OK();
824 }
825 
VerifyVirtualDeviceSettings(const size_t num_gpus_to_use,const GPUOptions & gpu_options,const std::vector<PlatformGpuId> & visible_gpu_order,const std::vector<PlatformGpuId> & valid_platform_gpu_ids,const std::map<int,std::pair<int,int>> & supported_priority_ranges)826 Status VerifyVirtualDeviceSettings(
827     const size_t num_gpus_to_use, const GPUOptions& gpu_options,
828     const std::vector<PlatformGpuId>& visible_gpu_order,
829     const std::vector<PlatformGpuId>& valid_platform_gpu_ids,
830     const std::map<int, std::pair<int, int>>& supported_priority_ranges) {
831   const auto& virtual_devices = gpu_options.experimental().virtual_devices();
832   CHECK(!virtual_devices.empty());
833   if (gpu_options.per_process_gpu_memory_fraction() > 0) {
834     return errors::InvalidArgument(
835         "It's invalid to set per_process_gpu_memory_fraction when "
836         "virtual_devices is set.");
837   }
838   if (num_gpus_to_use < virtual_devices.size()) {
839     return errors::Unknown(
840         "Not enough GPUs to create virtual devices."
841         " num_gpus_to_use: ",
842         num_gpus_to_use, " #virtual_devices: ", virtual_devices.size());
843   }
844   if (!gpu_options.visible_device_list().empty() &&
845       visible_gpu_order.size() != virtual_devices.size()) {
846     return errors::InvalidArgument(
847         "The number of GPUs in visible_device_list doesn't match the number "
848         "of elements in the virtual_devices list.",
849         " #GPUs in visible_device_list: ", visible_gpu_order.size(),
850         " virtual_devices.size(): ", virtual_devices.size());
851   }
852   if (valid_platform_gpu_ids.size() != virtual_devices.size()) {
853     return errors::Unknown(
854         "The number of valid GPUs doesn't match the number of elements in "
855         "the virtual_devices list.",
856         " #valid GPUs: ", valid_platform_gpu_ids.size(),
857         " virtual_devices.size(): ", virtual_devices.size());
858   }
859 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
860   // Check memory_limt_mb and priority sizes match if priority is non-empty.
861   bool priority_exists = !virtual_devices.Get(0).priority().empty();
862   for (int i = 0; i < virtual_devices.size(); ++i) {
863     const auto& memory_limit_mb = virtual_devices.Get(i).memory_limit_mb();
864     const auto& priority = virtual_devices.Get(i).priority();
865     // If the priority is empty in the first one then treat this as having no
866     // priority set in any of the virtual devices for backward compatibility.
867     // Either it's set for all or none.
868     if (!priority_exists) {
869       if (!priority.empty()) {
870         return errors::InvalidArgument(
871             "Priority must be set for all virtual devices or none. But the "
872             "priority is specified for ",
873             i,
874             " while previous devices didn't "
875             "have any set.");
876       }
877     }
878     if (priority_exists && memory_limit_mb.size() != priority.size()) {
879       return errors::InvalidArgument(
880           "Number of virtual device priorities specified doesn't "
881           "match with number of memory_limit_mb specified for GPU# ",
882           i, " memory_limit_mb size: ", memory_limit_mb.size(),
883           " and priority size: ", priority.size());
884     }
885     const int gpu_id = valid_platform_gpu_ids[i].value();
886     auto it = supported_priority_ranges.find(gpu_id);
887     if (it == supported_priority_ranges.end()) {
888       return errors::Internal(
889           "Failed to find supported priority range for GPU"
890           " device ",
891           gpu_id);
892     }
893     const std::pair<int, int>& priority_range = it->second;
894     for (int p : priority) {
895       if (p > priority_range.first || p < priority_range.second) {
896         return errors::InvalidArgument(
897             "Priority ", p,
898             " is outside the range of supported priorities "
899             "[",
900             priority_range.second, ",", priority_range.first,
901             "] for virtual device ", i, " on GPU# ", gpu_id);
902       }
903     }
904   }
905 #endif
906 
907   return Status::OK();
908 }
909 
MinSystemMemory(int64 available_memory,int cc_major)910 int64 MinSystemMemory(int64 available_memory, int cc_major) {
911   // We use the following heuristic for now:
912   //
913   // If the available_memory is < 2GiB, we allocate 225MiB to system memory.
914   // Otherwise, depending on the capability version assign
915   //  500MiB (for cuda_compute_capability <= 6.x) or
916   // 1050MiB (for cuda_compute_capability <= 7.x) or
917   // 1536MiB (for cuda_compute_capability >= 8.x)
918   int64 min_system_memory;
919   if (available_memory < (1LL << 31)) {
920     min_system_memory = 225 * 1024 * 1024;
921   } else {
922     if (cc_major <= 6) {
923       min_system_memory = 500 * 1024 * 1024;
924     } else if (cc_major <= 7) {
925       min_system_memory = 1050 * 1024 * 1024;
926     } else {
927       min_system_memory = 1536 * 1024 * 1024;
928     }
929   }
930 #if defined(__GNUC__) && defined(__OPTIMIZE__)
931 // Do nothing
932 #elif !defined(__GNUC__) && defined(NDEBUG)
933 // Do nothing
934 #else
935   // Double the amount of available GPU memory in non-opt builds (debug
936   // builds in windows); because in non-opt builds more system memory
937   // is necessary.
938   min_system_memory *= 2;
939 #endif
940 
941 #if defined(ANDROID_TEGRA)
942   // 1GB system mem for NVIDIA Tegra devices since they use the same mem for
943   // RAM and Video RAM
944   min_system_memory = 1 << 30;
945 #endif
946 
947   VLOG(5) << "available_memory = " << available_memory;
948   VLOG(5) << "min_system_memory = " << min_system_memory;
949   return min_system_memory;
950 }
951 
952 // Get the memory limit for the virtual device being created on GPU with
953 // 'platform_gpu_id', when that virtual device is the only virtual device being
954 // created on that GPU.
SingleVirtualDeviceMemoryLimit(const GPUOptions & gpu_options,PlatformGpuId platform_gpu_id,int64 * memory_limit)955 Status SingleVirtualDeviceMemoryLimit(const GPUOptions& gpu_options,
956                                       PlatformGpuId platform_gpu_id,
957                                       int64* memory_limit) {
958   int64 total_memory = 0;
959   int64 available_memory = 0;
960   se::StreamExecutor* se = DeviceIdUtil::ExecutorForPlatformDeviceId(
961                                GPUMachineManager(), platform_gpu_id)
962                                .ValueOrDie();
963   if (!se->DeviceMemoryUsage(&available_memory, &total_memory)) {
964     return errors::Unknown("Failed to query available memory for GPU ",
965                            platform_gpu_id.value());
966   }
967 
968   int64 allocated_memory = 0;
969   const double per_process_gpu_memory_fraction =
970       gpu_options.per_process_gpu_memory_fraction();
971   int cc_major = 0, cc_minor = 0;
972   if (!se->GetDeviceDescription().cuda_compute_capability(&cc_major,
973                                                           &cc_minor)) {
974     return errors::Internal("Failed to get compute capability for device.");
975   }
976   if (per_process_gpu_memory_fraction > 1.0 ||
977       gpu_options.experimental().use_unified_memory()) {
978     if (cc_major < 6) {
979       return errors::Internal(
980           "Unified memory on GPUs with compute capability lower than 6.0 "
981           "(pre-Pascal class GPUs) does not support oversubscription.");
982     }
983   }
984 
985   if (per_process_gpu_memory_fraction == 0) {
986     allocated_memory = available_memory;
987     const int64 min_system_memory = MinSystemMemory(available_memory, cc_major);
988     if (min_system_memory < allocated_memory) {
989       allocated_memory -= min_system_memory;
990     }
991   } else {
992     allocated_memory = total_memory * per_process_gpu_memory_fraction;
993   }
994 
995   // Override the excluded memory when TF_DEVICE_MIN_SYS_MEMORY_IN_MB is set.
996   const char* force_device_reserved_bytes =
997       std::getenv("TF_DEVICE_MIN_SYS_MEMORY_IN_MB");
998   if (force_device_reserved_bytes != nullptr &&
999       strcmp(force_device_reserved_bytes, "") != 0) {
1000     int32 reserved_mb;
1001     if (!strings::safe_strto32(force_device_reserved_bytes, &reserved_mb) ||
1002         reserved_mb < 0) {
1003       LOG(WARNING) << "The requested reserved device memory "
1004                    << force_device_reserved_bytes
1005                    << " is invalid. The request will be ignored.";
1006     } else {
1007       // Convert MBytes to Bytes.
1008       size_t allowable_reserved_memory = reserved_mb * 1024 * 1024;
1009       // TF_DEVICE_MIN_SYS_MEMORY_IN_MB overrides
1010       // per_process_gpu_memory_fraction.
1011       if (allowable_reserved_memory <= available_memory) {
1012         allocated_memory = available_memory - allowable_reserved_memory;
1013         VLOG(1) << "Setting the GPU reserved bytes to "
1014                 << strings::HumanReadableNumBytes(allocated_memory)
1015                 << " MBytes";
1016       } else {
1017         LOG(WARNING) << "The requested reserved device memory "
1018                      << strings::HumanReadableNumBytes(
1019                             allowable_reserved_memory)
1020                      << " is larger than the available memory of "
1021                      << strings::HumanReadableNumBytes(available_memory)
1022                      << ". The request is ignored.";
1023       }
1024     }
1025   }
1026   *memory_limit = allocated_memory;
1027   return Status::OK();
1028 }
1029 }  // namespace
1030 
ReinitializeDevice(OpKernelContext * context,PerOpGpuDevice * device,int stream_id,Allocator * allocator)1031 void BaseGPUDevice::ReinitializeDevice(OpKernelContext* context,
1032                                        PerOpGpuDevice* device, int stream_id,
1033                                        Allocator* allocator) {
1034   ConcretePerOpGpuDevice* concrete_device =
1035       static_cast<ConcretePerOpGpuDevice*>(device);
1036   DCHECK(concrete_device);
1037   DCHECK_EQ(stream_id, 0);
1038   const gpuStream_t* gpu_stream = reinterpret_cast<const gpuStream_t*>(
1039       stream_->compute->implementation()->GpuStreamMemberHack());
1040   concrete_device->Reinitialize(context, gpu_stream, tf_gpu_id_, allocator,
1041                                 scratch_);
1042 }
1043 
MakeGpuDevice()1044 PerOpGpuDevice* BaseGPUDevice::MakeGpuDevice() {
1045   return new ConcretePerOpGpuDevice();
1046 }
1047 
ReinitializeGpuDevice(OpKernelContext * context,PerOpGpuDevice * device,DeviceContext * dc,Allocator * allocator)1048 Status BaseGPUDevice::ReinitializeGpuDevice(OpKernelContext* context,
1049                                             PerOpGpuDevice* device,
1050                                             DeviceContext* dc,
1051                                             Allocator* allocator) {
1052   TF_RETURN_IF_ERROR(InitScratchBuffers());
1053   if (dc) {
1054     const GPUDeviceContext* gpu_dc = static_cast<GPUDeviceContext*>(dc);
1055     const int stream_id = gpu_dc->stream_id();
1056     VLOG(1) << "  eigen_gpu_device(" << dc << ") => stream[" << stream_id
1057             << "]";
1058     CHECK_EQ(stream_id, 0);
1059     ReinitializeDevice(context, device, stream_id, allocator);
1060   } else {
1061     ReinitializeDevice(context, device, 0, allocator);
1062   }
1063   return Status::OK();
1064 }
1065 
GetScopedAllocator(AllocatorAttributes attr,int64 step_id)1066 Allocator* BaseGPUDevice::GetScopedAllocator(AllocatorAttributes attr,
1067                                              int64 step_id) {
1068   if (attr.scope_id > 0) {
1069     return scoped_allocator_mgr_->GetContainer(step_id)->GetInstance(
1070         attr.scope_id);
1071   }
1072   LOG(FATAL) << "Unexpected call to BaseGPUDevice::GetScopedAllocator "
1073              << "attr.scope_id = " << attr.scope_id;
1074   return gpu_allocator_;
1075 }
1076 
1077 const int BaseGPUDeviceFactory::InterconnectMap::kSameDeviceStrength = 1000;
1078 const int BaseGPUDeviceFactory::InterconnectMap::kStreamExecutorStrength = 1;
1079 
CacheDeviceIds()1080 Status BaseGPUDeviceFactory::CacheDeviceIds() {
1081   if (!cached_device_ids_.empty()) {
1082     return Status::OK();
1083   }
1084 
1085   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1086   se::Platform* gpu_manager = GPUMachineManager();
1087   if (gpu_manager == nullptr) {
1088     return Status::OK();
1089   }
1090 
1091   int device_count = gpu_manager->VisibleDeviceCount();
1092   if (device_count <= 0) {
1093     return Status::OK();
1094   }
1095 
1096   std::vector<PlatformGpuId> visible_gpu_order(device_count);
1097   std::iota(visible_gpu_order.begin(), visible_gpu_order.end(), 0);
1098   TF_RETURN_IF_ERROR(GetValidDeviceIds(visible_gpu_order, &cached_device_ids_));
1099   return Status::OK();
1100 }
1101 
ListPhysicalDevices(std::vector<string> * devices)1102 Status BaseGPUDeviceFactory::ListPhysicalDevices(std::vector<string>* devices) {
1103   TF_RETURN_IF_ERROR(CacheDeviceIds());
1104   for (PlatformGpuId platform_gpu_id : cached_device_ids_) {
1105     const string device_name =
1106         strings::StrCat("/physical_device:GPU:", platform_gpu_id.value());
1107     devices->push_back(device_name);
1108   }
1109 
1110   return Status::OK();
1111 }
1112 
GetDeviceDetails(int device_index,std::unordered_map<string,string> * details)1113 Status BaseGPUDeviceFactory::GetDeviceDetails(
1114     int device_index, std::unordered_map<string, string>* details) {
1115   TF_RETURN_IF_ERROR(CacheDeviceIds());
1116 
1117   if (device_index < 0 || device_index > cached_device_ids_.size()) {
1118     return errors::Internal("Invalid device index: ", device_index);
1119   }
1120   PlatformGpuId platform_gpu_id = cached_device_ids_[device_index];
1121 
1122   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1123   se::Platform* gpu_manager = GPUMachineManager();
1124   if (gpu_manager == nullptr) {
1125     return errors::Internal("Cannot get GPUMachineManager");
1126   }
1127   auto desc_status = gpu_manager->DescriptionForDevice(platform_gpu_id.value());
1128   if (!desc_status.ok()) {
1129     return desc_status.status();
1130   }
1131 
1132   auto desc = desc_status.ConsumeValueOrDie();
1133   (*details)["device_name"] = desc->name();
1134 #if GOOGLE_CUDA
1135   int cc_major, cc_minor;
1136   if (desc->cuda_compute_capability(&cc_major, &cc_minor)) {
1137     (*details)["compute_capability"] = strings::StrCat(cc_major, ".", cc_minor);
1138   }
1139 #endif  // GOOGLE_CUDA
1140   return Status::OK();
1141 }
1142 
CreateDevices(const SessionOptions & options,const string & name_prefix,std::vector<std::unique_ptr<Device>> * devices)1143 Status BaseGPUDeviceFactory::CreateDevices(
1144     const SessionOptions& options, const string& name_prefix,
1145     std::vector<std::unique_ptr<Device>>* devices) {
1146   TF_RETURN_IF_ERROR(ValidateGPUMachineManager());
1147   se::Platform* gpu_manager = GPUMachineManager();
1148   if (gpu_manager == nullptr) {
1149     return Status::OK();
1150   }
1151   // If there are no GPUs visible, do nothing.
1152   if (gpu_manager->VisibleDeviceCount() <= 0) {
1153     return Status::OK();
1154   }
1155 
1156   size_t num_gpus_to_use = INT_MAX;
1157   auto iter = options.config.device_count().find("GPU");
1158   if (iter != options.config.device_count().end()) {
1159     num_gpus_to_use = iter->second;
1160   }
1161   const auto& gpu_options = options.config.gpu_options();
1162   std::vector<PlatformGpuId> visible_gpu_order;
1163   std::vector<PlatformGpuId> valid_platform_gpu_ids;
1164   // If we aren't going to use any GPUs, don't initialize them.
1165   // We don't want to call ParseVisibleDeviceList if num_gpus_to_use is 0,
1166   // because it treats an empty gpu_options.visible_device_list as 'all GPUs
1167   // are visible'.
1168   if (num_gpus_to_use > 0) {
1169     TF_RETURN_IF_ERROR(ParseVisibleDeviceList(gpu_options.visible_device_list(),
1170                                               &visible_gpu_order));
1171     bool new_gpu_found = false;
1172     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1173       int visible_gpu_id = visible_gpu_order[i].value();
1174 
1175       // Only perform this once per visible gpu id.
1176       if (visible_gpu_initialized_[visible_gpu_id]) {
1177         continue;
1178       }
1179 
1180       visible_gpu_initialized_[visible_gpu_id] = true;
1181       new_gpu_found = true;
1182     }
1183 
1184     // Checking peering and shows matrix if more than one gpu found.
1185     if (new_gpu_found && visible_gpu_order.size() > 1) {
1186       // Enable peer access
1187       TF_RETURN_IF_ERROR(EnablePeerAccess(visible_gpu_order));
1188     }
1189 
1190     TF_RETURN_IF_ERROR(
1191         GetValidDeviceIds(visible_gpu_order, &valid_platform_gpu_ids));
1192   }
1193   if (num_gpus_to_use > valid_platform_gpu_ids.size()) {
1194     num_gpus_to_use = valid_platform_gpu_ids.size();
1195   }
1196   std::map<int, std::pair<int, int>> supported_priority_ranges;
1197   if (!valid_platform_gpu_ids.empty()) {
1198     // Save the original device.
1199     int original_device = 0;
1200 #if GOOGLE_CUDA
1201     cudaError_t err = cudaGetDevice(&original_device);
1202     if (err != cudaSuccess) {
1203       return errors::Internal("cudaGetDevice() failed. Status: ",
1204                               cudaGetErrorString(err));
1205     }
1206 #elif TENSORFLOW_USE_ROCM
1207     hipError_t err = hipGetDevice(&original_device);
1208     if (err != hipSuccess) {
1209       return errors::Internal("hipGetDevice() failed. Status: ",
1210                               hipGetErrorString(err));
1211     }
1212 #endif
1213 
1214     // Force to implicitly initialize CUDA runtime on each valid GPU before
1215     // CreateGPUDevice().
1216     for (PlatformGpuId platform_gpu_id : valid_platform_gpu_ids) {
1217 #if GOOGLE_CUDA
1218       err = cudaSetDevice(platform_gpu_id.value());
1219       if (err != cudaSuccess) {
1220         return errors::Internal(
1221             "cudaSetDevice() on GPU:", platform_gpu_id.value(),
1222             " failed. Status: ", cudaGetErrorString(err));
1223       }
1224       err = cudaFree(nullptr);
1225       if (err != cudaSuccess) {
1226         return errors::Internal("CUDA runtime implicit initialization on GPU:",
1227                                 platform_gpu_id.value(),
1228                                 " failed. Status: ", cudaGetErrorString(err));
1229       }
1230       int priority_low, priority_high;
1231       cudaDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1232       if (err != cudaSuccess) {
1233         return errors::Internal(
1234             "cudaDeviceGetStreamPriorityRange() on GPU:", original_device,
1235             " failed. Status: ", cudaGetErrorString(err));
1236       }
1237       VLOG(1) << "Cuda stream priority range on GPU(" << original_device
1238               << "): " << priority_high << "," << priority_low;
1239       supported_priority_ranges.insert(
1240           std::make_pair(platform_gpu_id.value(),
1241                          std::make_pair(priority_low, priority_high)));
1242 #elif TENSORFLOW_USE_ROCM
1243       err = hipSetDevice(platform_gpu_id.value());
1244       if (err != hipSuccess) {
1245         return errors::Internal(
1246             "hipSetDevice() on GPU:", platform_gpu_id.value(),
1247             " failed. Status: ", hipGetErrorString(err));
1248       }
1249       err = hipFree(nullptr);
1250       if (err != hipSuccess) {
1251         return errors::Internal("ROCm runtime implicit initialization on GPU:",
1252                                 platform_gpu_id.value(),
1253                                 " failed. Status: ", hipGetErrorString(err));
1254       }
1255       int priority_low, priority_high;
1256       hipDeviceGetStreamPriorityRange(&priority_low, &priority_high);
1257       if (err != hipSuccess) {
1258         return errors::Internal(
1259             "hipDeviceGetStreamPriorityRange() on GPU:", original_device,
1260             " failed. Status: ", hipGetErrorString(err));
1261       }
1262       VLOG(1) << "HIP stream priority range on GPU(" << original_device
1263               << "): " << priority_high << "," << priority_low;
1264       supported_priority_ranges.insert(
1265           std::make_pair(platform_gpu_id.value(),
1266                          std::make_pair(priority_low, priority_high)));
1267 #endif
1268     }
1269     // Reset to the original device.
1270 #if GOOGLE_CUDA
1271     err = cudaSetDevice(original_device);
1272     if (err != cudaSuccess) {
1273       return errors::Internal("cudaSetDevice() on GPU:", original_device,
1274                               " failed. Status: ", cudaGetErrorString(err));
1275     }
1276 #elif TENSORFLOW_USE_ROCM
1277     err = hipSetDevice(original_device);
1278     if (err != hipSuccess) {
1279       return errors::Internal("hipSetDevice() on GPU:", original_device,
1280                               " failed. Status: ", hipGetErrorString(err));
1281     }
1282 #endif
1283 
1284 #if GOOGLE_CUDA
1285     // Log the version of CUDA and cuDNN
1286     int cuda_major_version = CUDART_VERSION / 1000;
1287     int cuda_minor_version = (CUDART_VERSION / 10) % 10;
1288     VLOG(1) << "TensorFlow compiled with CUDA " << cuda_major_version << "."
1289             << cuda_minor_version << " and cuDNN " << CUDNN_MAJOR << "."
1290             << CUDNN_MINOR << "." << CUDNN_PATCHLEVEL;
1291 #endif
1292   }
1293 
1294   std::vector<InterconnectMap> interconnect_maps;
1295   TF_RETURN_IF_ERROR(
1296       GetInterconnectMaps(visible_gpu_order, gpu_manager, &interconnect_maps));
1297 
1298   // Print each interconnect map to the log.
1299   for (const InterconnectMap& im : interconnect_maps) {
1300     LOG(INFO) << "Device interconnect " << im.name << " with strength "
1301               << im.strength << " edge matrix:";
1302     string line_buf = "     ";
1303     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1304       strings::StrAppend(&line_buf, visible_gpu_order[i].value(), " ");
1305     }
1306     LOG(INFO) << line_buf;
1307     for (int i = 0; i < visible_gpu_order.size(); ++i) {
1308       line_buf = strings::StrCat(visible_gpu_order[i].value(), ":   ");
1309       PlatformGpuId gpu_id_i = visible_gpu_order[i];
1310       for (int j = 0; j < visible_gpu_order.size(); ++j) {
1311         PlatformGpuId gpu_id_j = visible_gpu_order[j];
1312         if (im.directed_links.find({gpu_id_i, gpu_id_j}) !=
1313             im.directed_links.end()) {
1314           line_buf.append("Y ");
1315         } else {
1316           line_buf.append("N ");
1317         }
1318       }
1319       LOG(INFO) << line_buf;
1320     }
1321   }
1322 
1323   const auto& virtual_devices = gpu_options.experimental().virtual_devices();
1324   if (!virtual_devices.empty()) {
1325     TF_RETURN_IF_ERROR(VerifyVirtualDeviceSettings(
1326         num_gpus_to_use, gpu_options, visible_gpu_order, valid_platform_gpu_ids,
1327         supported_priority_ranges));
1328     // We've verified that num_gpus_to_use >= virtual_devices.size().
1329     num_gpus_to_use = virtual_devices.size();
1330     CHECK(gpu_options.visible_device_list().empty() ||
1331           valid_platform_gpu_ids == visible_gpu_order);
1332   }
1333   int next_tf_gpu_id = 0;
1334   std::vector<int64> memory_limit_bytes;
1335   for (int i = 0; i < num_gpus_to_use; ++i) {
1336     const PlatformGpuId platform_gpu_id = valid_platform_gpu_ids[i];
1337     if (virtual_devices.empty() ||
1338         virtual_devices.Get(i).memory_limit_mb_size() == 0) {
1339       int64 single_virtual_device_memory_limit = 0;
1340       TF_RETURN_IF_ERROR(SingleVirtualDeviceMemoryLimit(
1341           gpu_options, platform_gpu_id, &single_virtual_device_memory_limit));
1342       memory_limit_bytes.push_back(single_virtual_device_memory_limit);
1343     } else {
1344       const auto& memory_limit_mb = virtual_devices.Get(i).memory_limit_mb();
1345       std::transform(memory_limit_mb.begin(), memory_limit_mb.end(),
1346                      std::back_inserter(memory_limit_bytes), [](float mb) {
1347                        return static_cast<int64>(mb) * (1ll << 20);
1348                      });
1349     }
1350     while (next_tf_gpu_id < memory_limit_bytes.size()) {
1351       TfGpuId tf_gpu_id(next_tf_gpu_id);
1352       ++next_tf_gpu_id;
1353       TF_RETURN_IF_ERROR(
1354           GpuIdManager::InsertTfPlatformGpuIdPair(tf_gpu_id, platform_gpu_id));
1355     }
1356   }
1357   const int num_tf_gpus = next_tf_gpu_id;
1358 
1359   LocalityMap device_localities;
1360   TF_RETURN_IF_ERROR(
1361       GetDeviceLocalities(num_tf_gpus, interconnect_maps, &device_localities));
1362 
1363   // Build the GPUDevices
1364   CHECK_EQ(next_tf_gpu_id, memory_limit_bytes.size());
1365   for (int di = 0; di < num_tf_gpus; ++di) {
1366     TfGpuId tf_gpu_id(di);
1367     int64 bytes = memory_limit_bytes[di];
1368     auto it = device_localities.find(tf_gpu_id);
1369     if (it == device_localities.end()) {
1370       return errors::Internal("Failed to find DeviceLocality for GPU device ",
1371                               tf_gpu_id.value());
1372     }
1373     TF_RETURN_IF_ERROR(CreateGPUDevice(options, name_prefix, tf_gpu_id, bytes,
1374                                        it->second, num_tf_gpus, devices));
1375   }
1376   return Status::OK();
1377 }
1378 
GetShortDeviceDescription(PlatformGpuId platform_gpu_id,const se::DeviceDescription & desc)1379 static string GetShortDeviceDescription(PlatformGpuId platform_gpu_id,
1380                                         const se::DeviceDescription& desc) {
1381 #if GOOGLE_CUDA
1382   int cc_major;
1383   int cc_minor;
1384   if (!desc.cuda_compute_capability(&cc_major, &cc_minor)) {
1385     cc_major = 0;
1386     cc_minor = 0;
1387   }
1388   // LINT.IfChange
1389   return strings::StrCat("device: ", platform_gpu_id.value(),
1390                          ", name: ", desc.name(),
1391                          ", pci bus id: ", desc.pci_bus_id(),
1392                          ", compute capability: ", cc_major, ".", cc_minor);
1393   // LINT.ThenChange(//tensorflow/python/framework/gpu_util.py)
1394 #elif TENSORFLOW_USE_ROCM
1395   return strings::StrCat("device: ", platform_gpu_id.value(),
1396                          ", name: ", desc.name(),
1397                          ", pci bus id: ", desc.pci_bus_id());
1398 #endif
1399 }
1400 
CreateGPUDevice(const SessionOptions & options,const string & name_prefix,TfGpuId tf_gpu_id,int64 memory_limit,const DeviceLocality & dev_locality,size_t num_tf_gpus,std::vector<std::unique_ptr<Device>> * devices)1401 Status BaseGPUDeviceFactory::CreateGPUDevice(
1402     const SessionOptions& options, const string& name_prefix, TfGpuId tf_gpu_id,
1403     int64 memory_limit, const DeviceLocality& dev_locality, size_t num_tf_gpus,
1404     std::vector<std::unique_ptr<Device>>* devices) {
1405   CHECK_GE(tf_gpu_id.value(), 0);
1406   const string device_name =
1407       strings::StrCat(name_prefix, "/device:GPU:", tf_gpu_id.value());
1408   DeviceIdUtil::CheckValidTfDeviceId(DEVICE_GPU, GPUMachineManager(),
1409                                      tf_gpu_id);
1410   PlatformGpuId platform_gpu_id;
1411   TF_RETURN_IF_ERROR(
1412       GpuIdManager::TfToPlatformGpuId(tf_gpu_id, &platform_gpu_id));
1413   int numa_node = dev_locality.numa_node();
1414 
1415   se::Platform* gpu_manager = GPUMachineManager();
1416   auto desc_status = gpu_manager->DescriptionForDevice(platform_gpu_id.value());
1417   if (!desc_status.ok()) {
1418     return desc_status.status();
1419   }
1420   auto desc = desc_status.ConsumeValueOrDie();
1421 
1422   std::vector<TfGpuId> peer_gpu_ids;
1423   peer_gpu_ids.reserve(num_tf_gpus);
1424   for (int id = 0; id < num_tf_gpus; ++id) {
1425     TfGpuId peer_tf_gpu_id(id);
1426     if (peer_tf_gpu_id != tf_gpu_id) {
1427       peer_gpu_ids.push_back(peer_tf_gpu_id);
1428     }
1429   }
1430 
1431   GPUProcessState* process_state = GPUProcessState::singleton();
1432   Allocator* gpu_allocator = process_state->GetGPUAllocator(
1433       options.config.gpu_options(), tf_gpu_id, memory_limit, peer_gpu_ids);
1434   if (gpu_allocator == nullptr) {
1435     return errors::Internal("Failed to get memory allocator for TF GPU ",
1436                             tf_gpu_id.value(), " with ", memory_limit,
1437                             " bytes of memory.");
1438   }
1439   absl::optional<AllocatorStats> stats = gpu_allocator->GetStats();
1440   if (!stats) {
1441     return errors::Internal("No allocator statistics");
1442   }
1443   // 'memory_limit' is the required memory size, but if the allocator with
1444   // given tf_gpu_id was created before, we'll use it instead of creating a
1445   // new one (as TF gpu device is a shared resource), in which case the actual
1446   // memory limit represented by 'stats.bytes_limit' used by that allocator
1447   // may be different (which should be an error).
1448   //
1449   // TODO(laigd): report error if memory_limit doesn't match
1450   // stats->bytes_limit.
1451   int64 bytes_limit = stats->bytes_limit ? *stats->bytes_limit : 0;
1452   std::unique_ptr<BaseGPUDevice> gpu_device = CreateGPUDevice(
1453       options, device_name, static_cast<Bytes>(bytes_limit), dev_locality,
1454       tf_gpu_id, GetShortDeviceDescription(platform_gpu_id, *desc),
1455       gpu_allocator, ProcessState::singleton()->GetCPUAllocator(numa_node));
1456   LOG(INFO) << "Created TensorFlow device (" << device_name << " with "
1457             << (bytes_limit >> 20) << " MB memory) -> physical GPU ("
1458             << GetShortDeviceDescription(platform_gpu_id, *desc) << ")";
1459   TF_RETURN_IF_ERROR(gpu_device->Init(options));
1460   devices->push_back(std::move(gpu_device));
1461 
1462   return Status::OK();
1463 }
1464 
1465 namespace {
1466 std::unique_ptr<std::map<std::pair<PlatformGpuId, PlatformGpuId>, bool>>
GetPeerAccessMap(se::Platform * platform,const std::vector<PlatformGpuId> & visible_gpu_order)1467 GetPeerAccessMap(se::Platform* platform,
1468                  const std::vector<PlatformGpuId>& visible_gpu_order) {
1469   std::unique_ptr<std::map<std::pair<PlatformGpuId, PlatformGpuId>, bool>> map(
1470       new std::map<std::pair<PlatformGpuId, PlatformGpuId>, bool>);
1471   for (PlatformGpuId platform_gpu_i : visible_gpu_order) {
1472     for (PlatformGpuId platform_gpu_j : visible_gpu_order) {
1473       se::StreamExecutor* from =
1474           DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_i)
1475               .ValueOrDie();
1476       se::StreamExecutor* to =
1477           DeviceIdUtil::ExecutorForPlatformDeviceId(platform, platform_gpu_j)
1478               .ValueOrDie();
1479       (*map)[{platform_gpu_i, platform_gpu_j}] =
1480           from->CanEnablePeerAccessTo(to);
1481     }
1482   }
1483 
1484   return map;
1485 }
1486 
1487 }  // namespace
1488 
GetInterconnectMaps(const std::vector<PlatformGpuId> & visible_gpu_order,se::Platform * gpu_manager,std::vector<InterconnectMap> * maps)1489 Status BaseGPUDeviceFactory::GetInterconnectMaps(
1490     const std::vector<PlatformGpuId>& visible_gpu_order,
1491     se::Platform* gpu_manager, std::vector<InterconnectMap>* maps) {
1492   // The default interconnect map is obtained from the StreamExecutor.
1493   auto access_map = GetPeerAccessMap(gpu_manager, visible_gpu_order);
1494   maps->resize(1);
1495   InterconnectMap& imap = maps->at(0);
1496   imap.name = "StreamExecutor";
1497   imap.strength = InterconnectMap::kStreamExecutorStrength;
1498   for (PlatformGpuId gpu_id_i : visible_gpu_order) {
1499     for (PlatformGpuId gpu_id_j : visible_gpu_order) {
1500       if (gpu_id_i == gpu_id_j) continue;
1501       if ((*access_map)[{gpu_id_i, gpu_id_j}]) {
1502         imap.directed_links.insert({gpu_id_i, gpu_id_j});
1503       }
1504     }
1505   }
1506   return Status::OK();
1507 }
1508 
GetDeviceLocalities(int num_tf_gpus,const std::vector<InterconnectMap> & interconnects,LocalityMap * localities)1509 Status BaseGPUDeviceFactory::GetDeviceLocalities(
1510     int num_tf_gpus, const std::vector<InterconnectMap>& interconnects,
1511     LocalityMap* localities) {
1512   std::vector<TfGpuId> all_tf_gpu_ids;
1513   all_tf_gpu_ids.reserve(num_tf_gpus);
1514   for (int i = 0; i < num_tf_gpus; ++i) {
1515     all_tf_gpu_ids.push_back(TfGpuId(i));
1516   }
1517   for (TfGpuId tf_gpu_id : all_tf_gpu_ids) {
1518     PlatformGpuId platform_gpu_id;
1519     TF_RETURN_IF_ERROR(
1520         GpuIdManager::TfToPlatformGpuId(tf_gpu_id, &platform_gpu_id));
1521     // Get GPU bus_id from its reported NUMA affinity.  Because GPUs are
1522     // virtualized in some environments, we can't just use the GPU id.
1523     // NUMA locales are indexed from 0, buses are indexed from 1.
1524     se::Platform* gpu_manager = GPUMachineManager();
1525     auto desc_status =
1526         gpu_manager->DescriptionForDevice(platform_gpu_id.value());
1527     if (!desc_status.ok()) {
1528       return desc_status.status();
1529     }
1530     auto desc = desc_status.ConsumeValueOrDie();
1531     int numa_node = desc->numa_node();
1532     if (numa_node < 0) {
1533       // For some reason the StreamExecutor couldn't get the NUMA
1534       // affinity of the GPU.  If this is not a multi-socket mobo with
1535       // GPUs local to different buses, it doesn't matter.  If it is, we
1536       // may run into trouble later with data transfer operations.  The
1537       // trouble may manifest as slower than expected performance, or
1538       // outright failures.
1539       LOG(INFO) << "Could not identify NUMA node of platform GPU id "
1540                 << platform_gpu_id
1541                 << ", defaulting to 0.  Your kernel may not have been built "
1542                 << "with NUMA support.";
1543       numa_node = 0;
1544     }
1545     DeviceLocality dev_locality;
1546     dev_locality.set_numa_node(numa_node);
1547     dev_locality.set_bus_id(numa_node + 1);
1548 
1549     // Set LocalLinks from InterconnectMaps.
1550     LocalLinks* links = dev_locality.mutable_links();
1551     for (const InterconnectMap& imap : interconnects) {
1552       for (TfGpuId tf_gpu_dst : all_tf_gpu_ids) {
1553         PlatformGpuId platform_gpu_dst;
1554         TF_RETURN_IF_ERROR(
1555             GpuIdManager::TfToPlatformGpuId(tf_gpu_dst, &platform_gpu_dst));
1556         if (imap.directed_links.find({platform_gpu_id, platform_gpu_dst}) !=
1557             imap.directed_links.end()) {
1558           InterconnectLink* ilink = links->add_link();
1559           ilink->set_device_id(tf_gpu_dst.value());
1560           ilink->set_type(imap.name);
1561           ilink->set_strength(imap.strength);
1562         }
1563       }
1564     }
1565 
1566     // If this is one of multiple virtual GPUs on the same physical GPU
1567     // add high strength links to the others.
1568     for (TfGpuId tf_gpu_dst : all_tf_gpu_ids) {
1569       if (tf_gpu_id == tf_gpu_dst) continue;
1570       PlatformGpuId platform_gpu_dst;
1571       TF_RETURN_IF_ERROR(
1572           GpuIdManager::TfToPlatformGpuId(tf_gpu_dst, &platform_gpu_dst));
1573       if (platform_gpu_id == platform_gpu_dst) {
1574         InterconnectLink* ilink = links->add_link();
1575         ilink->set_device_id(tf_gpu_dst.value());
1576         ilink->set_type("SAME_DEVICE");
1577         ilink->set_strength(InterconnectMap::kSameDeviceStrength);
1578       }
1579     }
1580 
1581     (*localities)[tf_gpu_id] = dev_locality;
1582     VLOG(1) << "GPUDevice PlatformGpuId " << platform_gpu_id << " TfGpuId "
1583             << tf_gpu_id << " on bus " << dev_locality.bus_id()
1584             << " numa: " << numa_node << " pci: " << desc->pci_bus_id()
1585             << " DeviceLocality: " << dev_locality.DebugString();
1586   }
1587   return Status::OK();
1588 }
1589 
GetDefaultMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformGpuId> & visible_gpu_order)1590 static int GetDefaultMinGPUMultiprocessorCount(
1591     se::Platform* gpu_manager,
1592     const std::vector<PlatformGpuId>& visible_gpu_order) {
1593   static const int kDefaultMinGPUMultiprocessorCount = 8;
1594 
1595   // Find the highest multi-processor count across all visible GPUs.
1596   int max_count = -1;
1597   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1598     int visible_gpu_id = visible_gpu_order[i].value();
1599     auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1600     if (!description_status.ok()) {
1601       continue;
1602     }
1603 
1604     auto description = description_status.ConsumeValueOrDie();
1605     max_count = std::max(max_count, description->core_count());
1606   }
1607 
1608   if (max_count < 0 || kDefaultMinGPUMultiprocessorCount < max_count) {
1609     return kDefaultMinGPUMultiprocessorCount;
1610   } else {
1611     return max_count;
1612   }
1613 }
1614 
GetMinGPUMultiprocessorCount(se::Platform * gpu_manager,const std::vector<PlatformGpuId> & visible_gpu_order)1615 static int GetMinGPUMultiprocessorCount(
1616     se::Platform* gpu_manager,
1617     const std::vector<PlatformGpuId>& visible_gpu_order) {
1618   const char* tf_min_gpu_core_count = getenv("TF_MIN_GPU_MULTIPROCESSOR_COUNT");
1619 
1620   if (tf_min_gpu_core_count == nullptr ||
1621       strcmp(tf_min_gpu_core_count, "") == 0) {
1622     return GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1623   }
1624 
1625   int min_gpu_core_count = -1;
1626   if (strings::safe_strto32(tf_min_gpu_core_count, &min_gpu_core_count)) {
1627     if (min_gpu_core_count >= 0) {
1628       return min_gpu_core_count;
1629     }
1630   }
1631 
1632   int count =
1633       GetDefaultMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1634   LOG(ERROR) << "Invalid minimum GPU multiprocessor count: ["
1635              << tf_min_gpu_core_count << "]. "
1636              << "Using the default value: " << count;
1637   return count;
1638 }
1639 
1640 namespace {
1641 
1642 #if GOOGLE_CUDA
1643 struct CudaVersion {
1644   // Initialize from version_name in the form of "3.5"
CudaVersiontensorflow::__anon0a6e82d20b11::CudaVersion1645   explicit CudaVersion(const std::string& version_name) {
1646     size_t dot_pos = version_name.find('.');
1647     CHECK(dot_pos != string::npos)
1648         << "Illegal version name: [" << version_name << "]";
1649     string major_str = version_name.substr(0, dot_pos);
1650     CHECK(strings::safe_strto32(major_str, &major_part))
1651         << "Illegal version name: [" << version_name << "]";
1652     string minor_str = version_name.substr(dot_pos + 1);
1653     CHECK(strings::safe_strto32(minor_str, &minor_part))
1654         << "Illegal version name: [" << version_name << "]";
1655   }
CudaVersiontensorflow::__anon0a6e82d20b11::CudaVersion1656   CudaVersion() {}
operator <tensorflow::__anon0a6e82d20b11::CudaVersion1657   bool operator<(const CudaVersion& other) const {
1658     if (this->major_part != other.major_part) {
1659       return this->major_part < other.major_part;
1660     }
1661     return this->minor_part < other.minor_part;
1662   }
operator <<(std::ostream & os,const CudaVersion & version)1663   friend std::ostream& operator<<(std::ostream& os,
1664                                   const CudaVersion& version) {
1665     os << version.major_part << "." << version.minor_part;
1666     return os;
1667   }
1668   int major_part = -1;
1669   int minor_part = -1;
1670 };
1671 
1672 std::vector<CudaVersion> supported_cuda_compute_capabilities = {
1673     CudaVersion("3.5"), CudaVersion("5.2")};
1674 
GetSupportedCudaComputeCapabilities()1675 std::vector<CudaVersion> GetSupportedCudaComputeCapabilities() {
1676   auto cuda_caps = supported_cuda_compute_capabilities;
1677 #ifdef TF_EXTRA_CUDA_CAPABILITIES
1678 // TF_EXTRA_CUDA_CAPABILITIES should be defined a sequence separated by commas,
1679 // for example:
1680 //   TF_EXTRA_CUDA_CAPABILITIES=3.0,4.0,5.0
1681 // Use two-level macro expansion for stringification.
1682 #define TF_XSTRING(...) #__VA_ARGS__
1683 #define TF_STRING(s) TF_XSTRING(s)
1684   string extra_cuda_caps = TF_STRING(TF_EXTRA_CUDA_CAPABILITIES);
1685 #undef TF_STRING
1686 #undef TF_XSTRING
1687   auto extra_capabilities = str_util::Split(extra_cuda_caps, ',');
1688   for (const auto& capability : extra_capabilities) {
1689     cuda_caps.push_back(CudaVersion(capability));
1690   }
1691 #endif
1692   return cuda_caps;
1693 }
1694 #endif  // GOOGLE_CUDA
1695 
1696 #if TENSORFLOW_USE_ROCM
1697 std::vector<int> supported_amdgpu_isa_versions = {803, 900, 906, 908};
1698 
GetSupportedAMDGPUISAVersions()1699 std::vector<int> GetSupportedAMDGPUISAVersions() {
1700   return supported_amdgpu_isa_versions;
1701 }
1702 #endif  // TENSORFLOW_USE_ROCM
1703 
1704 }  // namespace
1705 
EnablePeerAccess(const std::vector<PlatformGpuId> & visible_gpu_order)1706 Status BaseGPUDeviceFactory::EnablePeerAccess(
1707     const std::vector<PlatformGpuId>& visible_gpu_order) {
1708   se::Platform* gpu_manager = GPUMachineManager();
1709   int possible_peer_count = 0;
1710   int enabled_peer_count = 0;
1711   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1712     const PlatformGpuId platform_gpu_i = visible_gpu_order[i];
1713     for (int j = 0; j < visible_gpu_order.size(); ++j) {
1714       const PlatformGpuId platform_gpu_j = visible_gpu_order[j];
1715       // We have already validated that ExecutorForDevice() calls return OK.
1716       se::StreamExecutor* from =
1717           DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_i)
1718               .ValueOrDie();
1719       se::StreamExecutor* to =
1720           DeviceIdUtil::ExecutorForPlatformDeviceId(gpu_manager, platform_gpu_j)
1721               .ValueOrDie();
1722 
1723       if (from->CanEnablePeerAccessTo(to)) {
1724         ++possible_peer_count;
1725         auto status = from->EnablePeerAccessTo(to);
1726         if (!status.ok()) {
1727           LOG(WARNING)
1728               << "Unable to enable peer access between device ordinals "
1729               << platform_gpu_i << " and " << platform_gpu_j
1730               << ", status: " << status;
1731         } else {
1732           ++enabled_peer_count;
1733         }
1734       }
1735     }
1736   }
1737 
1738   // Return an error in the extreme failure case where the driver
1739   // reported that peering was possible but not a single peering was
1740   // successful.  This is to catch possible system misconfigurations
1741   // or more fundamental issues.
1742   if (possible_peer_count > 0 && enabled_peer_count == 0) {
1743     return errors::Internal(possible_peer_count,
1744                             " potential peer access pairs were reported by the "
1745                             "driver, but no peering could be enabled.");
1746   }
1747   return Status::OK();
1748 }
1749 
GetValidDeviceIds(const std::vector<PlatformGpuId> & visible_gpu_order,std::vector<PlatformGpuId> * ids)1750 Status BaseGPUDeviceFactory::GetValidDeviceIds(
1751     const std::vector<PlatformGpuId>& visible_gpu_order,
1752     std::vector<PlatformGpuId>* ids) {
1753   se::Platform* gpu_manager = GPUMachineManager();
1754   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1755     int visible_gpu_id = visible_gpu_order[i].value();
1756     auto description_status = gpu_manager->DescriptionForDevice(visible_gpu_id);
1757     if (!description_status.ok()) {
1758       return description_status.status();
1759     }
1760 
1761     auto description = description_status.ConsumeValueOrDie();
1762 #if GOOGLE_CUDA
1763     int cc_major;
1764     int cc_minor;
1765     if (!description->cuda_compute_capability(&cc_major, &cc_minor)) {
1766       // Logs internally on failure.
1767       cc_major = 0;
1768       cc_minor = 0;
1769     }
1770     LOG(INFO) << "Found device " << i << " with properties: "
1771               << "\npciBusID: " << description->pci_bus_id()
1772               << " name: " << description->name()
1773               << " computeCapability: " << cc_major << "." << cc_minor
1774               << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1775               << " coreCount: " << description->core_count()
1776               << " deviceMemorySize: "
1777               << strings::HumanReadableNumBytes(
1778                      description->device_memory_size())
1779               << " deviceMemoryBandwidth: "
1780               << strings::HumanReadableNumBytes(description->memory_bandwidth())
1781               << "/s";
1782 #elif TENSORFLOW_USE_ROCM
1783     std::string gcn_arch_name = description->rocm_amdgpu_gcn_arch_name();
1784     LOG(INFO) << "Found device " << i << " with properties: "
1785               << "\npciBusID: " << description->pci_bus_id()
1786               << " name: " << description->name()
1787               << "     ROCm AMDGPU Arch: " << gcn_arch_name
1788               << "\ncoreClock: " << description->clock_rate_ghz() << "GHz"
1789               << " coreCount: " << description->core_count()
1790               << " deviceMemorySize: "
1791               << strings::HumanReadableNumBytes(
1792                      description->device_memory_size())
1793               << " deviceMemoryBandwidth: "
1794               << strings::HumanReadableNumBytes(description->memory_bandwidth())
1795               << "/s";
1796 #endif
1797   }
1798 
1799 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
1800   // Try to dlopen GPU libraries if they are supposed to be dynamically loaded.
1801   auto handle_or = se::internal::DsoLoader::MaybeTryDlopenGPULibraries();
1802   if (!handle_or.ok()) {
1803     LOG(WARNING) << "Cannot dlopen some GPU libraries. Please make sure the "
1804                     "missing libraries mentioned above are installed properly "
1805                     "if you would like to use GPU. Follow the guide at "
1806                     "https://www.tensorflow.org/install/gpu for how to "
1807                     "download and setup the required libraries for your "
1808                     "platform.\nSkipping registering "
1809                     "GPU devices...";
1810     return Status::OK();
1811   }
1812 #endif
1813 
1814 #if GOOGLE_CUDA
1815   auto cuda_supported_capabilities = GetSupportedCudaComputeCapabilities();
1816   if (cuda_supported_capabilities.empty()) {
1817     return errors::FailedPrecondition(
1818         "No supported cuda capabilities in binary.");
1819   }
1820   CudaVersion min_supported_capability = *std::min_element(
1821       cuda_supported_capabilities.begin(), cuda_supported_capabilities.end());
1822 #elif TENSORFLOW_USE_ROCM
1823   auto rocm_supported_isas = GetSupportedAMDGPUISAVersions();
1824   if (rocm_supported_isas.empty()) {
1825     return errors::FailedPrecondition(
1826         "No supported rocm capabilities in binary.");
1827   }
1828   int min_supported_isa =
1829       *std::min_element(rocm_supported_isas.begin(), rocm_supported_isas.end());
1830 #endif
1831 
1832   int min_gpu_core_count =
1833       GetMinGPUMultiprocessorCount(gpu_manager, visible_gpu_order);
1834 
1835   // Filter out devices that don't have the right capability or power.
1836   for (int i = 0; i < visible_gpu_order.size(); ++i) {
1837     const PlatformGpuId visible_gpu_id = visible_gpu_order[i];
1838     auto description_status =
1839         gpu_manager->DescriptionForDevice(visible_gpu_id.value());
1840     if (!description_status.ok()) {
1841       LOG(INFO) << "Ignoring visible gpu device " << visible_gpu_id
1842                 << " whose executor is in invalid state: "
1843                 << description_status.status().ToString();
1844       continue;
1845     }
1846 
1847     auto desc = description_status.ConsumeValueOrDie();
1848 
1849 #if GOOGLE_CUDA
1850     CudaVersion device_capability;
1851     if (!desc->cuda_compute_capability(&device_capability.major_part,
1852                                        &device_capability.minor_part)) {
1853       LOG(INFO) << "Ignoring visible gpu device "
1854                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1855                 << ") "
1856                 << "whose CUDA compute capability is not available.";
1857       continue;
1858     }
1859     // Only GPUs with no less than the minimum supported compute capability is
1860     // accepted.
1861     if (device_capability < min_supported_capability) {
1862       LOG(INFO) << "Ignoring visible gpu device "
1863                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1864                 << ") "
1865                 << "with Cuda compute capability " << device_capability
1866                 << ". The minimum required Cuda capability is "
1867                 << min_supported_capability << ".";
1868       continue;
1869     }
1870 #elif TENSORFLOW_USE_ROCM
1871     int device_isa;
1872     if (!desc->rocm_amdgpu_isa_version(&device_isa)) {
1873       continue;
1874     }
1875     // Only GPUs with no less than the minimum supported compute capability is
1876     // accepted.
1877     if (device_isa < min_supported_isa) {
1878       LOG(INFO) << "Ignoring visible gpu device "
1879                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1880                 << ") "
1881                 << "with AMDGPU ISA gfx" << device_isa
1882                 << ". The minimum required AMDGPU ISA is gfx"
1883                 << min_supported_isa << ".";
1884       continue;
1885     }
1886 #endif
1887 
1888     // Filter out slow GPUs. By default, GPUs with a lower multiprocessor
1889     // count than the fastest GPU are filtered out, unless they have 8 or more
1890     // multiprocessors. If the TF_MIN_GPU_MULTIPROCESSOR_COUNT environment
1891     // variable is set, its value will be used to filter out GPUs.
1892     if (desc->core_count() < min_gpu_core_count) {
1893       LOG(INFO) << "Ignoring visible gpu device "
1894                 << "(" << GetShortDeviceDescription(visible_gpu_id, *desc)
1895                 << ") "
1896                 << "with core count: " << desc->core_count()
1897                 << ". The minimum required count is " << min_gpu_core_count
1898                 << ". You can adjust this requirement with the env var "
1899                    "TF_MIN_GPU_MULTIPROCESSOR_COUNT.";
1900       continue;
1901     }
1902     ids->push_back(visible_gpu_id);
1903   }
1904   if (!ids->empty()) {
1905     std::vector<int> raw_ids(ids->size());
1906     std::transform(ids->begin(), ids->end(), raw_ids.begin(),
1907                    [](PlatformGpuId id) -> int { return id.value(); });
1908     LOG(INFO) << "Adding visible gpu devices: " << absl::StrJoin(raw_ids, ", ");
1909   }
1910 
1911   return Status::OK();
1912 }
1913 
SafeAllocFrontier(uint64 old_value)1914 uint64 BaseGPUDevice::SafeAllocFrontier(uint64 old_value) {
1915   if (timestamped_allocator_) {
1916     return kernel_tracker_->LastTerminatedCount(old_value);
1917   } else {
1918     return 0;
1919   }
1920 }
1921 
PendingKernels()1922 int BaseGPUDevice::PendingKernels() {
1923   if (kernel_tracker_) {
1924     return kernel_tracker_->NumPending();
1925   }
1926   return 0;
1927 }
1928 
TestOnlyReset()1929 void BaseGPUDevice::TestOnlyReset() {
1930   StreamGroupFactory::Global().TestOnlyReset();
1931 }
1932 
MaybeQueue(OpKernelContext * ctx)1933 uint64 GPUKernelTracker::MaybeQueue(OpKernelContext* ctx) {
1934   mutex_lock l(mu_);
1935   ++ops_since_last_;
1936   int64 mem_used =
1937       ctx->persistent_memory_allocated() + ctx->temp_memory_allocated();
1938   VLOG(2) << "kernel: " << ctx->op_kernel().name() << " mem_used: " << mem_used;
1939   mem_since_last_ += mem_used;
1940   int weight = 1;
1941   // Note that if all {max_bytes, max_interval, max_pending} are zero then
1942   // we track every single kernel with no pending cap.  This can happen if
1943   // timestamped_allocator alone was specified.
1944   if ((mem_since_last_ < params_.max_bytes) &&
1945       (ops_since_last_ < params_.max_interval)) {
1946     return 0;
1947   } else {
1948     weight = std::min(
1949         params_.max_pending,
1950         std::max(1, mem_since_last_ / std::max(16386, params_.max_bytes)));
1951     mem_since_last_ = 0;
1952     ops_since_last_ = 0;
1953   }
1954   uint64 queued_count = timing_counter_->next();
1955   RecordQueued(queued_count, weight);
1956   return queued_count;
1957 }
1958 
RecordQueued(uint64 queued_count,int weight)1959 void GPUKernelTracker::RecordQueued(uint64 queued_count, int weight) {
1960   VLOG(2) << "RecordQueued queued_count=" << queued_count
1961           << " first_available_=" << first_available_
1962           << " last_completed_=" << last_completed_
1963           << " num_pending_=" << num_pending_;
1964   pending_kernels_[first_available_].queued_count = queued_count;
1965   pending_kernels_[first_available_].weight = weight;
1966   pending_kernels_[first_available_].terminated = false;
1967   ++first_available_;
1968   num_pending_ += weight;
1969   if (first_available_ >= pending_kernels_.size()) {
1970     if (last_completed_ >= 0) {
1971       // wrap
1972       first_available_ = 0;
1973     } else {
1974       // enlarge the ring buffer
1975       pending_kernels_.resize(2 * pending_kernels_.size());
1976     }
1977   }
1978   if (first_available_ == last_completed_) {
1979     // Ring buffer is full: double it.  All of the same valid PendingKernel
1980     // entries exist after the copy, they are just shifted to begin
1981     // at index 0 in the new array.
1982     std::vector<PendingKernel> new_buffer(pending_kernels_.size() * 2);
1983     for (int i = 0; i < pending_kernels_.size(); ++i) {
1984       int j = (i + last_completed_) % pending_kernels_.size();
1985       new_buffer[i] = pending_kernels_[j];
1986     }
1987     last_completed_ = 0;
1988     first_available_ = pending_kernels_.size();
1989     pending_kernels_.swap(new_buffer);
1990     VLOG(1) << "last_completed_=" << last_completed_
1991             << " first_available_=" << first_available_
1992             << " num_pending_=" << num_pending_;
1993   }
1994   DCHECK_NE(first_available_, last_completed_) << "exhausted pending_kernels";
1995 }
1996 
1997 // Called by LastTerminatedCount() when new_value is equal to old_value.  This
1998 // case can occur where an allocation failed and waited for memory to be freed,
1999 // then when it retried the safe allocation frontier had not advanced because no
2000 // tracking event had matured.  Maybe GPU progress has stalled waiting on an i/o
2001 // event, or maybe we're tracking at too infrequent an interval.  In any case if
2002 // the GPU compute queue is actually empty it's safe to advance the safe
2003 // frontier so that this request can allocate from unrestricted (and better
2004 // compacted) memory.  So queue an event on the compute stream to ensure the
2005 // frontier does advance.
MaybeQueueProgressEvent()2006 void GPUKernelTracker::MaybeQueueProgressEvent() {
2007   mutex_lock l(mu_);
2008   if (num_pending_ == 0) {
2009     uint64 new_count = timing_counter_->next();
2010     RecordQueued(new_count, 1);
2011     em_->ThenExecute(stream_,
2012                      [this, new_count]() { RecordTerminated(new_count); });
2013   }
2014 }
2015 
RecordTerminated(uint64 queued_count)2016 void GPUKernelTracker::RecordTerminated(uint64 queued_count) {
2017   mutex_lock l(mu_);
2018   VLOG(2) << this << " RecordTerminated queued_count=" << queued_count
2019           << " first_available_=" << first_available_
2020           << " last_completed_=" << last_completed_
2021           << " num_pending_=" << num_pending_ << " LC="
2022           << ((last_completed_ >= 0)
2023                   ? pending_kernels_[last_completed_].queued_count
2024                   : -1);
2025   DCHECK_NE(first_available_, last_completed_);
2026   DCHECK_GT(num_pending_, 0);
2027   // Starting just past the last completed entry, find the entry with
2028   // this queued_count and mark it done.
2029   int index = (last_completed_ + 1) % pending_kernels_.size();
2030   int weight = 1;
2031   while (true) {
2032     if (index == first_available_) {
2033       // This should never happen.
2034       LOG(FATAL) << "Failed to find " << queued_count  // Crash OK
2035                  << " in queue, last_completed_=" << last_completed_
2036                  << " index=" << index
2037                  << " first_available_=" << first_available_
2038                  << " pending_kernels_.size()=" << pending_kernels_.size();
2039     }
2040     if (pending_kernels_[index].queued_count == queued_count) {
2041       pending_kernels_[index].terminated = true;
2042       weight = pending_kernels_[index].weight;
2043       break;
2044     }
2045     index = (index + 1) % pending_kernels_.size();
2046   }
2047   // Next move last_completed_ forward past all completed kernels.  In theory
2048   // kernels should always complete in queued order so we should be able to
2049   // advance the completed frontier to the just-completed PendingKernel.  In
2050   // practice we occasionally see the termination callbacks arrive out of
2051   // order probably because of thread scheduling.  Eventually we may support
2052   // out-of- order completion involving multiple compute streams so here we
2053   // follow a conservative approach and wait for every single callback to
2054   // arrive before advancing the frontier.
2055   while (true) {
2056     int next_index = (last_completed_ + 1) % pending_kernels_.size();
2057     if (next_index == first_available_) break;
2058     if (pending_kernels_[next_index].terminated) {
2059       last_completed_ = next_index;
2060     } else {
2061       break;
2062     }
2063   }
2064   if (last_completed_ >= 0) {
2065     int64 v = pending_kernels_[last_completed_].queued_count;
2066     last_terminated_count_ = v;
2067     if (allocator_) {
2068       allocator_->SetSafeFrontier(v);
2069     }
2070   }
2071   // Last decrease num_pending before maybe waking a waiter.
2072   num_pending_ -= weight;
2073   pending_decreased_.notify_all();
2074 }
2075 
2076 }  // namespace tensorflow
2077 
2078 #endif  // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
2079