1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM 17 18 #include <vector> 19 20 #if GOOGLE_CUDA 21 #include "third_party/nccl/nccl.h" 22 #elif TENSORFLOW_USE_ROCM 23 #include "rocm/include/rccl/rccl.h" 24 #endif 25 #include "tensorflow/core/framework/op_kernel.h" 26 #include "tensorflow/core/nccl/nccl_manager.h" 27 28 namespace tensorflow { 29 namespace { 30 31 // Base class for all communicator ops that use nccl. 32 // 33 // About memory management and stream syncing: 34 // 1. The nccl communicator has a stream for each rank. 35 // 2. For input tensors to the communicator, the compute stream is passed to the 36 // NcclManager which will do a needed 37 // communicator_stream.ThenWaitFor(input_tensor_stream). 38 // 3. The done_callback of the async kernel is not called by the 39 // NcclManager until after the communicator kernel is complete. This 40 // is enough to a) keep the input tensor data valid for the lifetime of the 41 // collective; and b) ensure the data in the output tensor is available 42 // when the async op kernel's done callback is called. 43 class NcclAsyncOpBase : public AsyncOpKernel { 44 public: NcclAsyncOpBase(OpKernelConstruction * c)45 explicit NcclAsyncOpBase(OpKernelConstruction* c) : AsyncOpKernel(c) { 46 OP_REQUIRES_OK(c, c->GetAttr("num_devices", &num_devices_)); 47 OP_REQUIRES_OK(c, c->GetAttr("shared_name", &collective_prefix_)); 48 } 49 GetCollectiveKey(OpKernelContext * c)50 string GetCollectiveKey(OpKernelContext* c) { 51 return strings::StrCat(collective_prefix_, ";", c->step_id(), ";", 52 c->frame_iter().frame_id, ":", 53 c->frame_iter().iter_id); 54 } 55 num_devices() const56 int num_devices() const { return num_devices_; } 57 58 private: 59 int num_devices_; 60 string collective_prefix_; 61 62 TF_DISALLOW_COPY_AND_ASSIGN(NcclAsyncOpBase); 63 }; 64 65 class NcclReduceOpBase : public NcclAsyncOpBase { 66 public: NcclReduceOpBase(OpKernelConstruction * c)67 explicit NcclReduceOpBase(OpKernelConstruction* c) : NcclAsyncOpBase(c) { 68 string reduction; 69 OP_REQUIRES_OK(c, c->GetAttr("reduction", &reduction)); 70 if (reduction == "min") { 71 reduction_op_ = ncclMin; 72 } else if (reduction == "max") { 73 reduction_op_ = ncclMax; 74 } else if (reduction == "sum") { 75 reduction_op_ = ncclSum; 76 } else if (reduction == "prod") { 77 reduction_op_ = ncclProd; 78 } else { 79 OP_REQUIRES_OK(c, 80 errors::InvalidArgument("Invalid reduction: ", reduction)); 81 } 82 } 83 reduction_op() const84 ncclRedOp_t reduction_op() const { return reduction_op_; } 85 86 private: 87 ncclRedOp_t reduction_op_; 88 }; 89 90 // To execute a single all-reduce, this kernel is called once for each of the 91 // <k> devices in the communicator. 92 class NcclAllReduceOpKernel : public NcclReduceOpBase { 93 public: NcclAllReduceOpKernel(OpKernelConstruction * c)94 explicit NcclAllReduceOpKernel(OpKernelConstruction* c) 95 : NcclReduceOpBase(c) {} 96 ComputeAsync(OpKernelContext * c,DoneCallback done)97 void ComputeAsync(OpKernelContext* c, DoneCallback done) override { 98 const Tensor* input = &c->input(0); 99 Tensor* output; 100 OP_REQUIRES_OK_ASYNC( 101 c, c->forward_input_or_allocate_output({0}, 0, input->shape(), &output), 102 done); 103 auto actual_done = [c, done](Status s) { 104 OP_REQUIRES_OK_ASYNC(c, s, done); 105 done(); 106 }; 107 108 auto* compute_stream = c->op_device_context()->stream(); 109 auto* gpu_info = c->device()->tensorflow_gpu_device_info(); 110 auto participant = absl::make_unique<NcclManager::Participant>( 111 compute_stream->parent(), compute_stream, gpu_info, input, output, 112 /*global_rank=*/-1, std::move(actual_done)); 113 NcclManager::instance()->AddToAllReduce( 114 std::move(participant), 115 {GetCollectiveKey(c), 116 /*num_local_devices=*/num_devices(), 117 /*num_global_devices=*/num_devices(), 118 /*communicator_key=*/"", /*source_rank=*/-1}, 119 reduction_op()); 120 } 121 }; 122 REGISTER_KERNEL_BUILDER(Name("NcclAllReduce").Device(DEVICE_GPU), 123 NcclAllReduceOpKernel); 124 125 // To execute a single reduce, this kernel is called once for all but one of the 126 // <k> devices in the communicator, and NcclReduceRecvKernel is called once for 127 // the remaining device. 128 class NcclReduceSendKernel : public NcclReduceOpBase { 129 public: NcclReduceSendKernel(OpKernelConstruction * c)130 explicit NcclReduceSendKernel(OpKernelConstruction* c) 131 : NcclReduceOpBase(c) {} 132 ComputeAsync(OpKernelContext * c,DoneCallback done)133 void ComputeAsync(OpKernelContext* c, DoneCallback done) override { 134 auto actual_done = [c, done](Status s) { 135 OP_REQUIRES_OK_ASYNC(c, s, done); 136 done(); 137 }; 138 139 auto* compute_stream = c->op_device_context()->stream(); 140 auto* gpu_info = c->device()->tensorflow_gpu_device_info(); 141 auto participant = absl::make_unique<NcclManager::Participant>( 142 compute_stream->parent(), compute_stream, gpu_info, &c->input(0), 143 /*output=*/nullptr, /*global_rank=*/-1, std::move(actual_done)); 144 NcclManager::instance()->AddReduceSend( 145 std::move(participant), 146 {GetCollectiveKey(c), 147 /*num_local_devices=*/num_devices(), 148 /*num_global_devices=*/num_devices(), 149 /*communicator_key=*/"", /*source_rank=*/-1}, 150 reduction_op()); 151 } 152 }; 153 REGISTER_KERNEL_BUILDER(Name("_NcclReduceSend").Device(DEVICE_GPU), 154 NcclReduceSendKernel); 155 156 // To execute a single reduce, this kernel is called once for one devices, and 157 // NcclReduceSendKernel is called for all other <k-1> devices in the 158 // communicator. 159 class NcclReduceRecvKernel : public NcclReduceOpBase { 160 public: NcclReduceRecvKernel(OpKernelConstruction * c)161 explicit NcclReduceRecvKernel(OpKernelConstruction* c) 162 : NcclReduceOpBase(c) {} 163 ComputeAsync(OpKernelContext * c,DoneCallback done)164 void ComputeAsync(OpKernelContext* c, DoneCallback done) override { 165 const Tensor* input = &c->input(0); 166 Tensor* output; 167 OP_REQUIRES_OK_ASYNC(c, c->allocate_output(0, input->shape(), &output), 168 done); 169 170 auto actual_done = [c, done](Status s) { 171 OP_REQUIRES_OK_ASYNC(c, s, done); 172 done(); 173 }; 174 175 auto* compute_stream = c->op_device_context()->stream(); 176 auto* gpu_info = c->device()->tensorflow_gpu_device_info(); 177 auto participant = absl::make_unique<NcclManager::Participant>( 178 compute_stream->parent(), compute_stream, gpu_info, input, output, 179 /*global_rank=*/-1, std::move(actual_done)); 180 NcclManager::instance()->AddReduceRecv( 181 std::move(participant), 182 {GetCollectiveKey(c), 183 /*num_local_devices=*/num_devices(), 184 /*num_global_devices=*/num_devices(), 185 /*communicator_key=*/"", /*source_rank=*/-1}, 186 reduction_op()); 187 } 188 189 private: 190 ncclRedOp_t reduction_op_; 191 }; 192 REGISTER_KERNEL_BUILDER(Name("_NcclReduceRecv").Device(DEVICE_GPU), 193 NcclReduceRecvKernel); 194 195 // To execute a single broadcast, this kernel is called once for one device, and 196 // NcclBroadcastRecvKernel is called for all other <k-1> devices in the 197 // communicator. 198 class NcclBroadcastSendKernel : public NcclAsyncOpBase { 199 public: NcclBroadcastSendKernel(OpKernelConstruction * c)200 explicit NcclBroadcastSendKernel(OpKernelConstruction* c) 201 : NcclAsyncOpBase(c) {} 202 ComputeAsync(OpKernelContext * c,DoneCallback done)203 void ComputeAsync(OpKernelContext* c, DoneCallback done) override { 204 auto actual_done = [c, done](Status s) { 205 OP_REQUIRES_OK_ASYNC(c, s, done); 206 done(); 207 }; 208 209 auto* compute_stream = c->op_device_context()->stream(); 210 auto* gpu_info = c->device()->tensorflow_gpu_device_info(); 211 auto participant = absl::make_unique<NcclManager::Participant>( 212 compute_stream->parent(), compute_stream, gpu_info, &c->input(0), 213 /*output=*/nullptr, /*global_rank=*/-1, std::move(actual_done)); 214 NcclManager::instance()->AddBroadcastSend( 215 std::move(participant), {GetCollectiveKey(c), 216 /*num_local_devices=*/num_devices(), 217 /*num_global_devices=*/num_devices(), 218 /*communicator_key=*/"", /*source_rank=*/-1}); 219 } 220 }; 221 REGISTER_KERNEL_BUILDER(Name("_NcclBroadcastSend").Device(DEVICE_GPU), 222 NcclBroadcastSendKernel); 223 224 // To execute a single broadcast, this kernel is called once for all but one of 225 // the <k> devices in the communicator, and NcclBroadcastSendKernel is called 226 // once for the remaining device. 227 class NcclBroadcastRecvKernel : public NcclAsyncOpBase { 228 public: NcclBroadcastRecvKernel(OpKernelConstruction * c)229 explicit NcclBroadcastRecvKernel(OpKernelConstruction* c) 230 : NcclAsyncOpBase(c) {} 231 ComputeAsync(OpKernelContext * c,DoneCallback done)232 void ComputeAsync(OpKernelContext* c, DoneCallback done) override { 233 const Tensor& shape_t = c->input(0); 234 TensorShape shape; 235 OP_REQUIRES_OK_ASYNC( 236 c, TensorShapeUtils::MakeShape(shape_t.vec<int32>(), &shape), done); 237 Tensor* output; 238 OP_REQUIRES_OK_ASYNC(c, c->allocate_output(0, shape, &output), done); 239 240 auto actual_done = [c, done](Status s) { 241 OP_REQUIRES_OK_ASYNC(c, s, done); 242 done(); 243 }; 244 245 auto* compute_stream = c->op_device_context()->stream(); 246 auto* gpu_info = c->device()->tensorflow_gpu_device_info(); 247 auto participant = absl::make_unique<NcclManager::Participant>( 248 compute_stream->parent(), compute_stream, gpu_info, 249 /*input=*/nullptr, output, /*global_rank=*/-1, std::move(actual_done)); 250 NcclManager::instance()->AddBroadcastRecv( 251 std::move(participant), {GetCollectiveKey(c), 252 /*num_local_devices=*/num_devices(), 253 /*num_global_devices=*/num_devices(), 254 /*communicator_key=*/"", /*source_rank=*/-1}); 255 } 256 }; 257 REGISTER_KERNEL_BUILDER( 258 Name("_NcclBroadcastRecv").Device(DEVICE_GPU).HostMemory("shape"), 259 NcclBroadcastRecvKernel); 260 261 // Define stub kernels for the ops that get replaced post placement. 262 class NcclStubKernel : public AsyncOpKernel { 263 public: NcclStubKernel(OpKernelConstruction * c)264 explicit NcclStubKernel(OpKernelConstruction* c) : AsyncOpKernel(c) {} ComputeAsync(OpKernelContext * c,DoneCallback done)265 void ComputeAsync(OpKernelContext* c, DoneCallback done) override { 266 c->SetStatus(errors::Unimplemented( 267 "This op should be replaced during graph optimization.")); 268 done(); 269 } 270 }; 271 REGISTER_KERNEL_BUILDER(Name("NcclBroadcast").Device(DEVICE_GPU), 272 NcclStubKernel); 273 REGISTER_KERNEL_BUILDER(Name("NcclReduce").Device(DEVICE_GPU), NcclStubKernel); 274 275 } // namespace 276 } // namespace tensorflow 277 278 #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM 279