1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #ifndef TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_
16 #define TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_
17 
18 #ifdef GOOGLE_CUDA
19 
20 #include <unordered_map>
21 #include <vector>
22 
23 // TODO(rmlarsen): Get rid of this workaround. "gpu_assert" is defined when
24 // setting EIGEN_USE_THREADS. But when defining EIGEN_USE_THREADS here,
25 // incAtomic and other CUDA specific symbols are no longer recognized.
26 #ifndef gpu_assert
27 #define gpu_assert(x)
28 #endif
29 
30 #include "third_party/nccl/nccl.h"
31 #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h"
32 #include "tensorflow/core/framework/tensor.h"
33 #include "tensorflow/core/platform/mutex.h"
34 #include "tensorflow/core/platform/stream_executor.h"
35 
36 namespace tensorflow {
37 
38 // NCCL manager is used to make the asynchronous communicator calls and to
39 // manage the per-device streams used for communication.
40 //
41 // See nccl_ops.cc for example usage, including description of memory
42 // management and stream synchronization.
43 class NcclManager {
44  public:
45   typedef std::function<void(Status)> DoneCallback;
46   NcclManager();
47   ~NcclManager();
48 
49   static NcclManager* instance();
50 
51   // Calls `ncclGetUniqueId` and returns the id as a string.  The returned value
52   // may be shared with other participants on different nodes and passed in to
53   // multi-node collective invocations.
54   string GenerateCommunicatorKey();
55 
56   // A participant in a Collective.
57   struct Participant {
ParticipantParticipant58     Participant(se::StreamExecutor* executor, se::Stream* tensor_stream,
59                 EventMgr* event_mgr, int gpu_device_id, const Tensor* input,
60                 Tensor* output, int global_rank, DoneCallback done_callback)
61         : executor(executor),
62           tensor_stream(tensor_stream),
63           event_mgr(event_mgr),
64           gpu_device_id(gpu_device_id),
65           input(input),
66           output(output),
67           global_rank(global_rank),
68           done_callback(std::move(done_callback)),
69           root(false) {
70       DCHECK(executor != nullptr);
71       DCHECK(event_mgr != nullptr);
72       DCHECK(tensor_stream != nullptr);
73     }
74 
75     // StreamExecutor for the device. Expected to be live for process lifetime.
76     se::StreamExecutor* const executor = nullptr;
77 
78     // `tensor_stream` is the stream that should be waited on to ensure
79     // `input`'s data is available on the GPU for the communication stream to
80     // access. It is also the stream that will use the produced data;
81     // `done_callback` is not called until the next kernel launched on `stream`
82     // would see the data. Owned by the caller, who must keep it live until
83     // `done_callback` is called.
84     se::Stream* const tensor_stream;
85 
86     // EventMgr which polls on executor.
87     // Owned by the caller, who must keep it live until `done_callback` is
88     // called.
89     EventMgr* const event_mgr;
90 
91     const int gpu_device_id;
92 
93     // Owned by the caller, who must keep it live until `done_callback` is
94     // called. Is NULL for participants that only receive data.
95     const Tensor* input;
96 
97     // Owned by the caller, who must keep it live until `done_callback` is
98     // called. Is NULL for participants that only send data.
99     Tensor* output;
100 
101     // Rank across all devices and all nodes.
102     // `global_rank` is not required for single-node collectives.
103     const int global_rank;
104 
105     // The callback which is called at the completion of the NCCL operation.
106     // When called, `output` has been set to the result of the operation. (note:
107     // the stream may not yet have been synced)
108     DoneCallback done_callback;
109 
110     // True if this is the root of the collective, e.g. source of broadcast.
111     bool root;
112   };
113 
114   // Data that provides context for the collective operation, including the
115   // operation key, number of participants, and communicator key.
116   struct Context {
ContextContext117     Context(const string& collective_key, int num_local_devices,
118             int num_global_devices, const string& communicator_key)
119         : collective_key(collective_key),
120           num_local_devices(num_local_devices),
121           num_global_devices(num_global_devices),
122           communicator_key(communicator_key) {}
123 
124     // Unique key for this collective instance
125     const string& collective_key;
126 
127     // Devices local to this node
128     int num_local_devices;
129 
130     // Devices across all nodes
131     int num_global_devices;
132 
133     // In order to use NCCL across nodes, the callee first has to generate a
134     // `communicator_key` via `GenerateCommunicatorKey()` function and share
135     // this with all the other nodes.  Each node should pass in this
136     // `communicator_key` to the `NcclManager` functions.
137     // `communicator_key` is not required for single-node collectives and can be
138     // empty.
139     const string& communicator_key;
140   };
141 
142   // Adds one participant to an all-reduce.
143   void AddToAllReduce(std::unique_ptr<Participant> participant,
144                       const Context& context, ncclRedOp_t reduction_op);
145 
146   // Adds one participant to an all-gather.
147   void AddToAllGather(std::unique_ptr<Participant> participant,
148                       const Context& context);
149 
150   // AddBroadcastSend and AddBroadcastRecv combine to send data from one sender
151   // to all receivers.
152   void AddBroadcastSend(std::unique_ptr<Participant> participant,
153                         const Context& context);
154   void AddBroadcastRecv(std::unique_ptr<Participant> participant,
155                         const Context& context);
156 
157   // AddReduceSend and AddReduceRecv combine to send data from all senders
158   // to one receiver.
159   void AddReduceSend(std::unique_ptr<Participant> participant,
160                      const Context& context, ncclRedOp_t reduction_op);
161   void AddReduceRecv(std::unique_ptr<Participant> participant,
162                      const Context& context, ncclRedOp_t reduction_op);
163 
164   // Signals that the `Collective` corresponding to `key` is ready to launch
165   // across all nodes participating in this multi-node collective operation.
166   //
167   // This should only be called for multi-node collectives; single-node
168   // collectives are implicitly ready when all participants have called Add*
169   // function.
170   void SignalMultiNodeReady(const string& collective_key);
171 
172  private:
173   enum CollectiveType {
174     kAllReduce = 1,
175     kBroadcast = 2,
176     kReduce = 3,
177     kAllGather = 4,
178   };
179   struct Collective;
180   struct Communicator;
181   struct CommunicatorMember;
182   struct NcclStream;
183 
184   // Gets the `Communicator` object that will be used to enqueue NCCL kernels
185   // for `collective`, and returns it via `communicator`.
186   //
187   // This may involve creating CUDA streams and NCCL initialization.  If a NCCL
188   // or CUDA error occurs in the process, this returns an INTERNAL error with
189   // the corresponding NCCL/CUDA error string.
190   Status GetCommunicator(Collective* collective, Communicator** communicator);
191 
192   // Adds a participant device to the local `Collective` instance corresponding
193   // to `collective_key`.  Launches the `Collective` if it is ready, which it
194   // checks by calling `CheckReady()`.  Also performs consistency and sanity
195   // checks before launching.
196   void AddParticipant(std::unique_ptr<Participant> participant,
197                       const Context& context, CollectiveType collective_type,
198                       ncclRedOp_t reduction_op);
199 
200   // If `collective` is ready to run, removes it from the `collectives_` map and
201   // returns the pointer.  Otherwise returns `nullptr`.
202   // Assumes `collective_key` corresponds to `collective`.
203   //
204   // A collective is ready to run when all local participants have called Add*
205   // function, and the collective is signalled globally ready via
206   // `SetMultiNodeReady`.
207   Collective* CheckReady(const string& collective_key, Collective* collective)
208       EXCLUSIVE_LOCKS_REQUIRED(mu_);
209 
210   // Run <collective>.  This calls takes ownership of <collective>.
211   void RunCollective(Collective* collective);
212   void LoopKernelLaunches(NcclStream* stream);
213 
214   mutex mu_;
215 
216   // Maps key to collectives currently being assembled or run.
217   std::unordered_map<string, std::unique_ptr<Collective>> collectives_
218       GUARDED_BY(mu_);
219 
220   // Maps a device to the communication streams that make up its collective.
221   // This is used to share the stream across different communicators that
222   // include the same device.
223   std::map<se::StreamExecutor*, std::vector<std::unique_ptr<NcclStream>>>
224       device_to_comm_streams_ GUARDED_BY(mu_);
225 
226   std::vector<std::unique_ptr<Communicator>> communicators_;
227 
228   TF_DISALLOW_COPY_AND_ASSIGN(NcclManager);
229 };
230 
231 }  // namespace tensorflow
232 
233 #endif  // GOOGLE_CUDA
234 
235 #endif  // TENSORFLOW_CORE_NCCL_NCCL_MANAGER_H_
236