1 /* Copyright 2019 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/kernels/collective_nccl_reducer.h"
16 
17 #ifdef GOOGLE_CUDA
18 
19 #include "tensorflow/core/common_runtime/collective_util.h"
20 #include "tensorflow/core/nccl/nccl_manager.h"
21 
22 namespace tensorflow {
23 namespace {
NcclCollectiveKey(const string & exec_key,int step_id)24 string NcclCollectiveKey(const string& exec_key, int step_id) {
25   return strings::StrCat(exec_key, ":", step_id);
26 }
27 }  // namespace
28 
NcclReducer()29 NcclReducer::NcclReducer() : col_ctx_(nullptr), col_params_(nullptr) {}
30 
InitializeCollectiveParams(CollectiveParams * col_params)31 Status NcclReducer::InitializeCollectiveParams(CollectiveParams* col_params) {
32   if (col_params->instance.type != REDUCTION_COLLECTIVE ||
33       col_params->instance.impl_details.collective_name != "NcclReduce") {
34     return errors::Internal("Unexpected collective type ",
35                             col_params->instance.type, " expected ",
36                             REDUCTION_COLLECTIVE, "; or collective name ",
37                             col_params->instance.impl_details.collective_name,
38                             " expected NcclReduce");
39   } else {
40     return Status::OK();
41   }
42 }
43 
InitializeCollectiveContext(CollectiveContext * col_ctx)44 Status NcclReducer::InitializeCollectiveContext(CollectiveContext* col_ctx) {
45   col_ctx_ = col_ctx;
46   col_params_ = &col_ctx->col_params;
47   return collective_util::InitializeDeviceAndLocality(
48       col_ctx->dev_mgr, col_ctx->device_name, &col_ctx->device,
49       &col_ctx->device_locality);
50 }
51 
InitializeInstanceBeforeGroupDiscovery(CollectiveParams * col_params)52 Status NcclReducer::InitializeInstanceBeforeGroupDiscovery(
53     CollectiveParams* col_params) {
54   if (col_params->default_rank == 0 && col_params->group.num_tasks > 1) {
55     col_params->instance.communicator_key =
56         NcclManager::instance()->GenerateCommunicatorKey();
57   }
58   return Status::OK();
59 }
60 
ReductionOp(const string & merge_op,ncclRedOp_t * reduction_op)61 Status ReductionOp(const string& merge_op, ncclRedOp_t* reduction_op) {
62   if (merge_op == "Add") {
63     *reduction_op = ncclSum;
64     return Status::OK();
65   } else if (merge_op == "Mul") {
66     *reduction_op = ncclProd;
67     return Status::OK();
68   } else {
69     return errors::Internal("Expected merge_op to be either Add or Mul, found ",
70                             merge_op);
71   }
72 }
73 
Run(StatusCallback done)74 void NcclReducer::Run(StatusCallback done) {
75   ncclRedOp_t reduction_op;
76   Status s = ReductionOp(col_params_->merge_op->type_string(), &reduction_op);
77   if (!s.ok()) {
78     done(s);
79     return;
80   }
81 
82   Tensor group_size;
83   Notification group_size_ready;
84   Status group_size_status;
85   if (col_params_->final_op) {
86     // Create an on-device scalar value from group_size_.
87     // TODO(ayushd, tucker): avoid this copy by either reusing across
88     // invocations or providing the scalar to the kernel in host memory.
89     Tensor group_size_val(col_ctx_->output->dtype(), TensorShape({}));
90     switch (col_ctx_->output->dtype()) {
91       case DT_FLOAT:
92         group_size_val.scalar<float>()() = col_params_->group.group_size;
93         break;
94       case DT_DOUBLE:
95         group_size_val.scalar<double>()() = col_params_->group.group_size;
96         break;
97       case DT_INT32:
98         group_size_val.scalar<int32>()() = col_params_->group.group_size;
99         break;
100       case DT_INT64:
101         group_size_val.scalar<int64>()() = col_params_->group.group_size;
102         break;
103       default:
104         done(errors::Internal("Unsupported type ", col_ctx_->output->dtype()));
105         return;
106     }
107     group_size = Tensor(
108         col_ctx_->device->GetAllocator(col_ctx_->op_ctx->input_alloc_attr(0)),
109         col_ctx_->output->dtype(), TensorShape({}));
110     DeviceContext* op_dev_ctx = col_ctx_->op_ctx->op_device_context();
111     // Enqueue copy on gpu stream.
112     op_dev_ctx->CopyCPUTensorToDevice(
113         &group_size_val, col_ctx_->device, &group_size,
114         [&group_size_ready, &group_size_status](const Status& s) {
115           group_size_status = s;
116           group_size_ready.Notify();
117         });
118   } else {
119     group_size_ready.Notify();
120   }
121 
122   Notification nccl_done;
123   Status nccl_status;
124   auto* compute_stream = col_ctx_->op_ctx->op_device_context()->stream();
125   auto* gpu_info = col_ctx_->op_ctx->device()->tensorflow_gpu_device_info();
126   // `AddToAllReduce` performs consistency checks for the NCCL call and enqueues
127   // the `Participant` struct locally.  When all local participants with this
128   // `nccl_collective_key` have called `AddToAllReduce` and
129   // `SignalMultiNodeReady`, all devices at this worker are ready to process
130   // this NCCL op.
131   //
132   // The `NcclManager` uses a dedicated CUDA stream for NCCL kernels.  At this
133   // point, it synchronizes the NCCL stream with the compute stream, and then
134   // enqueues the NCCL kernel on the NCCL stream.
135   const int num_global_devices = col_params_->group.group_size;
136   const int num_local_devices = col_params_->instance.num_devices_per_task.at(
137       col_params_->instance.task_names[col_params_->default_rank]);
138   const string nccl_collective_key =
139       NcclCollectiveKey(col_ctx_->exec_key, col_ctx_->step_id);
140   auto done_callback = [&nccl_done, &nccl_status](const Status& s) {
141     nccl_status = s;
142     nccl_done.Notify();
143   };
144   auto participant = absl::make_unique<NcclManager::Participant>(
145       compute_stream->parent(), compute_stream, gpu_info->event_mgr,
146       gpu_info->gpu_id, col_ctx_->input, col_ctx_->output,
147       col_params_->default_rank, std::move(done_callback));
148   VLOG(1) << "NcclReducer calling NcclManager::AddToAllReduce num_tasks "
149           << col_params_->group.num_tasks << " current task "
150           << col_params_->instance.task_names[col_params_->default_rank]
151           << " num local devices " << num_local_devices
152           << " num global devices " << num_global_devices << " device "
153           << col_ctx_->device_name << " instance "
154           << col_params_->instance.instance_key;
155   NcclManager::instance()->AddToAllReduce(
156       std::move(participant),
157       {nccl_collective_key, num_local_devices, num_global_devices,
158        col_params_->instance.communicator_key},
159       reduction_op);
160 
161   // NOTE(ayushd): We need to synchronize NCCL launches across nodes to prevent
162   // deadlocks.  In the current implementation, we define a deterministic
163   // sequential launch order between potentially concurrent collective instances
164   // by introducing control information during static graph analysis in
165   // graph/collective_order.cc.  This can be either in the form of explicit
166   // control edges or via `wait_for` attribute on the collective op.
167   //
168   // The other end of the design spectrum would have a distinguished node
169   // dynamically signal the next collective to launch to all other participants.
170   // This has higher degree of runtime coordination, but it may be able to
171   // achieve better performance if the (arbitrary) static execution order
172   // assigned in the first approach turns out to not be good from a scheduling
173   // perspective.  e.g. consider a graph in which c1, c2, and c3 are three
174   // concurrent collective instances, and the static ordering assigns c1 -> c2
175   // -> c3.  In practice, it could turn out that c3 is always ready to execute
176   // before c1 or c2.
177   //
178   // `WaitForDependencies` may block if the collective instances on which this
179   // op depends have not yet launched.  When this function returns, this op is
180   // ready to go.
181   col_ctx_->col_exec->WaitForDependencies(*col_params_);
182   NcclManager::instance()->SignalMultiNodeReady(nccl_collective_key);
183   // When all devices at this worker have called `SignalMultiNodeReady`, the
184   // `NcclManager` will enqueue the NCCL kernel on the NCCL stream.  Thus the
185   // implementation of `Launched` keeps track of the number of devices that have
186   // launched.
187   col_ctx_->col_exec->Launched(*col_params_);
188 
189   // Wait for nccl op and group_size copy to succeed, then do final_op.
190   group_size_ready.WaitForNotification();
191   nccl_done.WaitForNotification();
192   Status final_status =
193       group_size_status.ok() ? nccl_status : group_size_status;
194   if (final_status.ok() && col_params_->final_op) {
195     final_status = collective_util::ComputeBinOp(
196         col_ctx_->op_ctx, col_ctx_->op_params, col_ctx_->device,
197         col_params_->final_op.get(), col_ctx_->output, &group_size);
198   }
199   done(final_status);
200 }
201 
202 REGISTER_COLLECTIVE(NcclReduce, NcclReducer);
203 
204 }  // namespace tensorflow
205 
206 #endif  // GOOGLE_CUDA
207