1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM
17 
18 #include <vector>
19 
20 #if GOOGLE_CUDA
21 #include "third_party/nccl/nccl.h"
22 #elif TENSORFLOW_USE_ROCM
23 #include "rocm/include/rccl/rccl.h"
24 #endif
25 #include "tensorflow/core/framework/op_kernel.h"
26 #include "tensorflow/core/nccl/nccl_manager.h"
27 
28 namespace tensorflow {
29 namespace {
30 
31 // Base class for all communicator ops that use nccl.
32 //
33 // About memory management and stream syncing:
34 // 1. The nccl communicator has a stream for each rank.
35 // 2. For input tensors to the communicator, the compute stream is passed to the
36 //    NcclManager which will do a needed
37 //    communicator_stream.ThenWaitFor(input_tensor_stream).
38 // 3. The done_callback of the async kernel is not called by the
39 //    NcclManager until after the communicator kernel is complete. This
40 //    is enough to a) keep the input tensor data valid for the lifetime of the
41 //    collective; and b) ensure the data in the output tensor is available
42 //    when the async op kernel's done callback is called.
43 class NcclAsyncOpBase : public AsyncOpKernel {
44  public:
NcclAsyncOpBase(OpKernelConstruction * c)45   explicit NcclAsyncOpBase(OpKernelConstruction* c) : AsyncOpKernel(c) {
46     OP_REQUIRES_OK(c, c->GetAttr("num_devices", &num_devices_));
47     OP_REQUIRES_OK(c, c->GetAttr("shared_name", &collective_prefix_));
48   }
49 
GetCollectiveKey(OpKernelContext * c)50   string GetCollectiveKey(OpKernelContext* c) {
51     return strings::StrCat(collective_prefix_, ";", c->step_id(), ";",
52                            c->frame_iter().frame_id, ":",
53                            c->frame_iter().iter_id);
54   }
55 
num_devices() const56   int num_devices() const { return num_devices_; }
57 
58  private:
59   int num_devices_;
60   string collective_prefix_;
61 
62   TF_DISALLOW_COPY_AND_ASSIGN(NcclAsyncOpBase);
63 };
64 
65 class NcclReduceOpBase : public NcclAsyncOpBase {
66  public:
NcclReduceOpBase(OpKernelConstruction * c)67   explicit NcclReduceOpBase(OpKernelConstruction* c) : NcclAsyncOpBase(c) {
68     string reduction;
69     OP_REQUIRES_OK(c, c->GetAttr("reduction", &reduction));
70     if (reduction == "min") {
71       reduction_op_ = ncclMin;
72     } else if (reduction == "max") {
73       reduction_op_ = ncclMax;
74     } else if (reduction == "sum") {
75       reduction_op_ = ncclSum;
76     } else if (reduction == "prod") {
77       reduction_op_ = ncclProd;
78     } else {
79       OP_REQUIRES_OK(c,
80                      errors::InvalidArgument("Invalid reduction: ", reduction));
81     }
82   }
83 
reduction_op() const84   ncclRedOp_t reduction_op() const { return reduction_op_; }
85 
86  private:
87   ncclRedOp_t reduction_op_;
88 };
89 
90 // To execute a single all-reduce, this kernel is called once for each of the
91 // <k> devices in the communicator.
92 class NcclAllReduceOpKernel : public NcclReduceOpBase {
93  public:
NcclAllReduceOpKernel(OpKernelConstruction * c)94   explicit NcclAllReduceOpKernel(OpKernelConstruction* c)
95       : NcclReduceOpBase(c) {}
96 
ComputeAsync(OpKernelContext * c,DoneCallback done)97   void ComputeAsync(OpKernelContext* c, DoneCallback done) override {
98     const Tensor* input = &c->input(0);
99     Tensor* output;
100     OP_REQUIRES_OK_ASYNC(
101         c, c->forward_input_or_allocate_output({0}, 0, input->shape(), &output),
102         done);
103     auto actual_done = [c, done](Status s) {
104       OP_REQUIRES_OK_ASYNC(c, s, done);
105       done();
106     };
107 
108     auto* compute_stream = c->op_device_context()->stream();
109     auto* gpu_info = c->device()->tensorflow_gpu_device_info();
110     auto participant = absl::make_unique<NcclManager::Participant>(
111         compute_stream->parent(), compute_stream, gpu_info, input, output,
112         /*global_rank=*/-1, std::move(actual_done));
113     NcclManager::instance()->AddToAllReduce(
114         std::move(participant),
115         {GetCollectiveKey(c),
116          /*num_local_devices=*/num_devices(),
117          /*num_global_devices=*/num_devices(),
118          /*communicator_key=*/"", /*source_rank=*/-1},
119         reduction_op());
120   }
121 };
122 REGISTER_KERNEL_BUILDER(Name("NcclAllReduce").Device(DEVICE_GPU),
123                         NcclAllReduceOpKernel);
124 
125 // To execute a single reduce, this kernel is called once for all but one of the
126 // <k> devices in the communicator, and NcclReduceRecvKernel is called once for
127 // the remaining device.
128 class NcclReduceSendKernel : public NcclReduceOpBase {
129  public:
NcclReduceSendKernel(OpKernelConstruction * c)130   explicit NcclReduceSendKernel(OpKernelConstruction* c)
131       : NcclReduceOpBase(c) {}
132 
ComputeAsync(OpKernelContext * c,DoneCallback done)133   void ComputeAsync(OpKernelContext* c, DoneCallback done) override {
134     auto actual_done = [c, done](Status s) {
135       OP_REQUIRES_OK_ASYNC(c, s, done);
136       done();
137     };
138 
139     auto* compute_stream = c->op_device_context()->stream();
140     auto* gpu_info = c->device()->tensorflow_gpu_device_info();
141     auto participant = absl::make_unique<NcclManager::Participant>(
142         compute_stream->parent(), compute_stream, gpu_info, &c->input(0),
143         /*output=*/nullptr, /*global_rank=*/-1, std::move(actual_done));
144     NcclManager::instance()->AddReduceSend(
145         std::move(participant),
146         {GetCollectiveKey(c),
147          /*num_local_devices=*/num_devices(),
148          /*num_global_devices=*/num_devices(),
149          /*communicator_key=*/"", /*source_rank=*/-1},
150         reduction_op());
151   }
152 };
153 REGISTER_KERNEL_BUILDER(Name("_NcclReduceSend").Device(DEVICE_GPU),
154                         NcclReduceSendKernel);
155 
156 // To execute a single reduce, this kernel is called once for one devices, and
157 // NcclReduceSendKernel is called for all other <k-1> devices in the
158 // communicator.
159 class NcclReduceRecvKernel : public NcclReduceOpBase {
160  public:
NcclReduceRecvKernel(OpKernelConstruction * c)161   explicit NcclReduceRecvKernel(OpKernelConstruction* c)
162       : NcclReduceOpBase(c) {}
163 
ComputeAsync(OpKernelContext * c,DoneCallback done)164   void ComputeAsync(OpKernelContext* c, DoneCallback done) override {
165     const Tensor* input = &c->input(0);
166     Tensor* output;
167     OP_REQUIRES_OK_ASYNC(c, c->allocate_output(0, input->shape(), &output),
168                          done);
169 
170     auto actual_done = [c, done](Status s) {
171       OP_REQUIRES_OK_ASYNC(c, s, done);
172       done();
173     };
174 
175     auto* compute_stream = c->op_device_context()->stream();
176     auto* gpu_info = c->device()->tensorflow_gpu_device_info();
177     auto participant = absl::make_unique<NcclManager::Participant>(
178         compute_stream->parent(), compute_stream, gpu_info, input, output,
179         /*global_rank=*/-1, std::move(actual_done));
180     NcclManager::instance()->AddReduceRecv(
181         std::move(participant),
182         {GetCollectiveKey(c),
183          /*num_local_devices=*/num_devices(),
184          /*num_global_devices=*/num_devices(),
185          /*communicator_key=*/"", /*source_rank=*/-1},
186         reduction_op());
187   }
188 
189  private:
190   ncclRedOp_t reduction_op_;
191 };
192 REGISTER_KERNEL_BUILDER(Name("_NcclReduceRecv").Device(DEVICE_GPU),
193                         NcclReduceRecvKernel);
194 
195 // To execute a single broadcast, this kernel is called once for one device, and
196 // NcclBroadcastRecvKernel is called for all other <k-1> devices in the
197 // communicator.
198 class NcclBroadcastSendKernel : public NcclAsyncOpBase {
199  public:
NcclBroadcastSendKernel(OpKernelConstruction * c)200   explicit NcclBroadcastSendKernel(OpKernelConstruction* c)
201       : NcclAsyncOpBase(c) {}
202 
ComputeAsync(OpKernelContext * c,DoneCallback done)203   void ComputeAsync(OpKernelContext* c, DoneCallback done) override {
204     auto actual_done = [c, done](Status s) {
205       OP_REQUIRES_OK_ASYNC(c, s, done);
206       done();
207     };
208 
209     auto* compute_stream = c->op_device_context()->stream();
210     auto* gpu_info = c->device()->tensorflow_gpu_device_info();
211     auto participant = absl::make_unique<NcclManager::Participant>(
212         compute_stream->parent(), compute_stream, gpu_info, &c->input(0),
213         /*output=*/nullptr, /*global_rank=*/-1, std::move(actual_done));
214     NcclManager::instance()->AddBroadcastSend(
215         std::move(participant), {GetCollectiveKey(c),
216                                  /*num_local_devices=*/num_devices(),
217                                  /*num_global_devices=*/num_devices(),
218                                  /*communicator_key=*/"", /*source_rank=*/-1});
219   }
220 };
221 REGISTER_KERNEL_BUILDER(Name("_NcclBroadcastSend").Device(DEVICE_GPU),
222                         NcclBroadcastSendKernel);
223 
224 // To execute a single broadcast, this kernel is called once for all but one of
225 // the <k> devices in the communicator, and NcclBroadcastSendKernel is called
226 // once for the remaining device.
227 class NcclBroadcastRecvKernel : public NcclAsyncOpBase {
228  public:
NcclBroadcastRecvKernel(OpKernelConstruction * c)229   explicit NcclBroadcastRecvKernel(OpKernelConstruction* c)
230       : NcclAsyncOpBase(c) {}
231 
ComputeAsync(OpKernelContext * c,DoneCallback done)232   void ComputeAsync(OpKernelContext* c, DoneCallback done) override {
233     const Tensor& shape_t = c->input(0);
234     TensorShape shape;
235     OP_REQUIRES_OK_ASYNC(
236         c, TensorShapeUtils::MakeShape(shape_t.vec<int32>(), &shape), done);
237     Tensor* output;
238     OP_REQUIRES_OK_ASYNC(c, c->allocate_output(0, shape, &output), done);
239 
240     auto actual_done = [c, done](Status s) {
241       OP_REQUIRES_OK_ASYNC(c, s, done);
242       done();
243     };
244 
245     auto* compute_stream = c->op_device_context()->stream();
246     auto* gpu_info = c->device()->tensorflow_gpu_device_info();
247     auto participant = absl::make_unique<NcclManager::Participant>(
248         compute_stream->parent(), compute_stream, gpu_info,
249         /*input=*/nullptr, output, /*global_rank=*/-1, std::move(actual_done));
250     NcclManager::instance()->AddBroadcastRecv(
251         std::move(participant), {GetCollectiveKey(c),
252                                  /*num_local_devices=*/num_devices(),
253                                  /*num_global_devices=*/num_devices(),
254                                  /*communicator_key=*/"", /*source_rank=*/-1});
255   }
256 };
257 REGISTER_KERNEL_BUILDER(
258     Name("_NcclBroadcastRecv").Device(DEVICE_GPU).HostMemory("shape"),
259     NcclBroadcastRecvKernel);
260 
261 // Define stub kernels for the ops that get replaced post placement.
262 class NcclStubKernel : public AsyncOpKernel {
263  public:
NcclStubKernel(OpKernelConstruction * c)264   explicit NcclStubKernel(OpKernelConstruction* c) : AsyncOpKernel(c) {}
ComputeAsync(OpKernelContext * c,DoneCallback done)265   void ComputeAsync(OpKernelContext* c, DoneCallback done) override {
266     c->SetStatus(errors::Unimplemented(
267         "This op should be replaced during graph optimization."));
268     done();
269   }
270 };
271 REGISTER_KERNEL_BUILDER(Name("NcclBroadcast").Device(DEVICE_GPU),
272                         NcclStubKernel);
273 REGISTER_KERNEL_BUILDER(Name("NcclReduce").Device(DEVICE_GPU), NcclStubKernel);
274 
275 }  // namespace
276 }  // namespace tensorflow
277 
278 #endif  // GOOGLE_CUDA || TENSORFLOW_USE_ROCM
279