1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15# pylint: disable=g-short-docstring-punctuation
16"""## Communicating Between Processes with MPI
17
18TensorFlow natively provides inter-device communication through send and
19receive ops and inter-node communication through Distributed TensorFlow, based
20on the same send and receive abstractions. On HPC clusters where Infiniband or
21other high-speed node interconnects are available, these can end up being
22insufficient for synchronous data-parallel training (without asynchronous
23gradient descent). This module implements a variety of MPI ops which can take
24advantage of hardware-specific MPI libraries for efficient communication.
25
26In order to use this module, TensorFlow must be built with an MPI library,
27which can be provided to the `./configure` script at build time. As a user of
28TensorFlow, you will need to build TensorFlow yourself to select the MPI
29library to use; to do so, follow the [instructions for building TensorFlow from
30source](https://www.tensorflow.org/get_started/os_setup#installing_from_sources).
31
32### Utility Ops
33
34In addition to reductions and gathers, this module provides utility operations
35for detecting the running MPI configuration.
36
37Example:
38
39```python
40import tensorflow.contrib.mpi_collectives as mpi
41
42# Use `mpi.Session` instead of `tf.Session`
43with mpi.Session() as session:
44    rank = session.run(mpi.rank())
45    print("My MPI Rank:", rank)
46
47    if rank == 0:
48        print("MPI Size:", session.run(mpi.size()))
49```
50
51@@init
52@@size
53@@rank
54@@local_rank
55
56### Ring Allreduce and Allgather
57
58When summing or averaging tensors across many processes, communication can
59easily become a bottleneck. A naive implementation will send all the tensor
60values to the same process, perform the reduction, and then broadcast the
61values back to all other processes, effectively creating a synchronous
62parameter server in one process. However, the process responsible for
63performing the reduction will have to receive and send a massive amount of data
64which scales with the number of processes *and* the number of parameters in the
65model.
66
67Instead of centralizing the reduction and having one primary reducer, we can
68implement a distributed allreduce or allgather. A bandwidth-optimal allreduce
69will end up sending 2(N - 1) values for every value in the input tensor,
70and can be implemented with a ring allreduce [1]. (Intuitively, a linear reduce
71requires at least (N - 1) sends between the different nodes, and a broadcast of
72the result also requires (N - 1) sends, for a total of 2 (N - 1); these two
73steps cannot be combined in a clever way to reduce the number of required
74sends.) This module implements bandwidth-optimal ring allreduce and ring
75allgather operations using MPI; by choosing a hardware-appropriate MPI
76implementation (such as OpenMPI with CUDA-IPC support), you can train large
77models with synchronous gradient descent with minimal communication overhead.
78
79In addition to the `allreduce` and `allgather` functions, a convenience
80`DistributedOptimizer` wrapper is provided to simplify using these functions
81for reducing model gradients.
82
83Example:
84
85```python
86import tensorflow as tf
87from tensorflow.contrib import mpi_collectives as mpi
88
89# Construct a simple linear regression model to optimize
90W = tf.get_variable("W", shape=[20, 1], dtype=tf.float32)
91B = tf.get_variable("B", shape=[1, 1], dtype=tf.float32)
92inputs = tf.placeholder("Inputs", shape=[None, 20])
93outputs = tf.placeholder("Outputs", shape=[None, 1])
94loss = tf.nn.l2_loss(tf.matmul(inputs, W) + B - outputs)
95
96# Training using MPI allreduce with DistributedOptimizer
97optimizer = mpi.DistributedOptimizer(tf.train.AdamOptimizer())
98train = optimizer.minimize(loss)
99
100# Average loss over all ranks, for printing.
101# Do not pass this to an optimizer!
102avg_loss = mpi.allreduce(loss)
103
104# On different ranks, feed different input data.
105with mpi.Session() as session:
106    rank = session.run(mpi.rank())
107    batch_inputs, batch_outputs = construct_batch_for_rank(rank)
108    feed_dict = {inputs: batch_inputs, outputs: batch_outputs}
109    _, l = session.run([train, avg_loss], feed_dict=feed_dict)
110    print("Average Loss:", l)
111```
112
113[1] Patarasuk, Pitch and Yuan, Xin. "Bandwidth Optimal All-reduce Algorithms
114for Clusters of Workstations".
115
116@@Session
117@@DistributedOptimizer
118@@allreduce
119@@allgather
120"""
121
122from __future__ import absolute_import
123from __future__ import division
124from __future__ import print_function
125
126import tensorflow as tf
127
128from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import init
129from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import size
130from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import rank
131from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import local_rank
132from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import allgather
133from tensorflow.contrib.mpi_collectives.python.ops.mpi_ops import _allreduce
134
135
136def allreduce(tensor, average=True):
137  """Perform an MPI allreduce on a tf.Tensor or tf.IndexedSlices.
138
139  Arguments:
140  tensor: tf.Tensor, tf.Variable, or tf.IndexedSlices to reduce.
141          The shape of the input must be identical across all ranks.
142  average: If True, computes the average over all ranks.
143           Otherwise, computes the sum over all ranks.
144
145  This function performs a bandwidth-optimal ring allreduce on the input
146  tensor. If the input is an tf.IndexedSlices, the function instead does an
147  allgather on the values and the indices, effectively doing an allreduce on
148  the represented tensor.
149  """
150  if isinstance(tensor, tf.IndexedSlices):
151    # For IndexedSlices, do two allgathers intead of an allreduce.
152    mpi_size = tf.cast(size(), tensor.values.dtype)
153    values = allgather(tensor.values)
154    indices = allgather(tensor.indices)
155
156    # To make this operation into an average, divide all gathered values by
157    # the MPI size.
158    new_values = tf.div(values, mpi_size) if average else values
159    return tf.IndexedSlices(new_values, indices,
160                            dense_shape=tensor.dense_shape)
161  else:
162    mpi_size = tf.cast(size(), tensor.dtype)
163    summed_tensor = _allreduce(tensor)
164    new_tensor = (tf.div(summed_tensor, mpi_size)
165                  if average else summed_tensor)
166    return new_tensor
167
168
169class DistributedOptimizer(tf.train.Optimizer):
170  """An optimizer that wraps another tf.Optimizer, using an MPI allreduce to
171  average gradient values before applying gradients to model weights."""
172
173  def __init__(self, optimizer, name=None, use_locking=False):
174    """Construct a new DistributedOptimizer, which uses another optimizer
175    under the hood for computing single-process gradient values and
176    applying gradient updates after the gradient values have been averaged
177    across all the MPI ranks.
178
179    Args:
180    optimizer: Optimizer to use for computing gradients and applying updates.
181    name: Optional name prefix for the operations created when applying
182          gradients. Defaults to "Distributed" followed by the provided
183          optimizer type.
184    use_locking: Whether to use locking when updating variables. See
185                 Optimizer.__init__ for more info.
186    """
187    if name is None:
188      name = "Distributed{}".format(type(optimizer).__name__)
189
190    self._optimizer = optimizer
191    super(DistributedOptimizer, self).__init__(
192        name=name, use_locking=use_locking)
193
194  def compute_gradients(self, *args, **kwargs):
195    """Compute gradients of all trainable variables.
196
197    See Optimizer.compute_gradients() for more info.
198
199    In DistributedOptimizer, compute_gradients() is overridden to also
200    allreduce the gradients before returning them.
201    """
202    gradients = (super(DistributedOptimizer, self)
203                 .compute_gradients(*args, **kwargs))
204    return [(allreduce(gradient), var) for (gradient, var) in gradients]
205
206  def _apply_dense(self, *args, **kwargs):
207    """Calls this same method on the underlying optimizer."""
208    return self._optimizer._apply_dense(*args, **kwargs)
209
210  def _apply_sparse(self, *args, **kwargs):
211    """Calls this same method on the underlying optimizer."""
212    return self._optimizer._apply_sparse(*args, **kwargs)
213
214  def _apply_sparse_duplicate_indices(self, *args, **kwargs):
215    """Calls this same method on the underlying optimizer."""
216    return self._optimizer._apply_sparse_duplicate_indices(*args,
217                                                           **kwargs)
218
219  def _prepare(self, *args, **kwargs):
220    """Calls this same method on the underlying optimizer."""
221    return self._optimizer._prepare(*args, **kwargs)
222
223  def _create_slots(self, *args, **kwargs):
224    """Calls this same method on the underlying optimizer."""
225    return self._optimizer._create_slots(*args, **kwargs)
226
227  def _valid_dtypes(self, *args, **kwargs):
228    """Calls this same method on the underlying optimizer."""
229    return self._optimizer._valid_dtypes(*args, **kwargs)
230
231  def _finish(self, *args, **kwargs):
232    """Calls this same method on the underlying optimizer."""
233    return self._optimizer._finish(*args, **kwargs)
234
235
236class Session(tf.Session):
237  """A class for running TensorFlow operations, with copies of the same graph
238  running distributed across different MPI nodes.
239
240  The primary difference between `tf.Session` and
241  `tf.contrib.mpi_collectives.Session` is that the MPI `Session` ensures that
242  the `Session` options are correct for use with `tf.contrib.mpi`, and
243  initializes MPI immediately upon the start of the session.
244  """
245
246  def __init__(self, target='', graph=None, config=None):
247    """Creates a new TensorFlow MPI session.
248
249    Unlike a normal `tf.Session`, an MPI Session may only use a single GPU,
250    which must be specified in advance before the session is initialized.
251    In addition, it only uses a single graph evaluation thread, and
252    initializes MPI immediately upon starting.
253
254    If no `graph` argument is specified when constructing the session,
255    the default graph will be launched in the session. If you are
256    using more than one graph (created with `tf.Graph()` in the same
257    process, you will have to use different sessions for each graph,
258    but each graph can be used in multiple sessions. In this case, it
259    is often clearer to pass the graph to be launched explicitly to
260    the session constructor.
261
262    Args:
263    target: (Optional.) The execution engine to connect to.
264    graph: (Optional.) The `Graph` to be launched (described above).
265    config: (Optional.) A `ConfigProto` protocol buffer with configuration
266    options for the session.
267    """
268    super(Session, self).__init__(target, graph, config=config)
269
270    # Initialize MPI on the relevant device.
271    # TODO: Move this to library load and eliminate mpi.Session()
272    if graph is None:
273      graph = tf.get_default_graph()
274    with graph.as_default():
275      self.run(init())
276