1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Contrib version of MirroredStrategy."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21from tensorflow.python.distribute import distribute_lib
22from tensorflow.python.distribute import input_lib
23from tensorflow.python.distribute import mirrored_strategy
24
25
26# pylint: disable=protected-access,invalid-name
27_call_for_each_replica = mirrored_strategy._call_for_each_replica
28_create_mirrored_variable = mirrored_strategy._create_mirrored_variable
29all_local_devices = mirrored_strategy.all_local_devices
30CoreMirroredStrategy = mirrored_strategy.MirroredStrategy
31CoreMirroredExtended = mirrored_strategy.MirroredExtended
32# pylint: enable=protected-access,invalid-name
33
34
35class MirroredStrategy(distribute_lib.DistributionStrategy):
36  """Mirrors vars to distribute across multiple devices and machines.
37
38  *** contrib version ***
39
40  This strategy uses one replica per device and sync replication for its
41  multi-GPU version.
42
43  When `cluster_spec` is given by the `configure` method., it turns into the
44  mulit-worker version that works on multiple workers with in-graph replication.
45  Note: `configure` will be called by higher-level APIs if running in
46  distributed environment.
47
48  There are several important concepts for distributed TensorFlow, e.g.
49  `client`, `job`, `task`, `cluster`, `in-graph replication` and
50  `synchronous training` and they have already been defined in the
51  [TensorFlow's documentation](https://www.tensorflow.org/deploy/distributed).
52  The distribution strategy inherits these concepts as well and in addition to
53  that we also clarify several more concepts:
54
55  * **In-graph replication**: the `client` creates a single `tf.Graph` that
56    specifies tasks for devices on all workers. The `client` then creates a
57    client session which will talk to the `master` service of a `worker`. Then
58    the `master` will partition the graph and distribute the work to all
59    participating workers.
60  * **Worker**: A `worker` is a TensorFlow `task` that usually maps to one
61    physical machine. We will have multiple `worker`s with different `task`
62    index. They all do similar things except for one worker checkpointing model
63    variables, writing summaries, etc. in addition to its ordinary work.
64
65  The multi-worker version of this class maps one replica to one device on a
66  worker. It mirrors all model variables on all replicas. For example, if you
67  have two `worker`s and each `worker` has 4 GPUs, it will create 8 copies of
68  the model variables on these 8 GPUs. Then like in MirroredStrategy, each
69  replica performs their computation with their own copy of variables unless in
70  cross-replica model where variable or tensor reduction happens.
71
72  Args:
73    devices: a list of device strings.
74    num_gpus: number of GPUs. For local training, either specify `devices` or
75      `num_gpus`. In distributed training, this must be specified as number of
76      GPUs on each worker.
77    num_gpus_per_worker: number of GPUs per worker. This is the same as
78      `num_gpus` and only one of `num_gpus` and `num_gpus_per_worker` can be
79      specified.
80    cross_device_ops: optional, a descedant of `CrossDeviceOps`. If this is not
81      set, the `configure` method will try to find the best one.
82    auto_shard_dataset: whether to auto-shard the dataset when there are
83      multiple workers.
84    cross_tower_ops: Deprecated alias for `cross_device_ops`.
85  """
86
87  def __init__(self,
88               devices=None,
89               num_gpus=None,
90               num_gpus_per_worker=None,
91               cross_device_ops=None,
92               auto_shard_dataset=False,
93               cross_tower_ops=None):
94    assert not (cross_device_ops and cross_tower_ops)
95    if num_gpus is not None and num_gpus_per_worker is not None:
96      raise ValueError(
97          "You cannot specify both `num_gpus` and `num_gpus_per_worker`.")
98    if num_gpus is None:
99      num_gpus = num_gpus_per_worker
100    extended = MirroredExtended(self, devices, num_gpus,
101                                cross_device_ops or cross_tower_ops,
102                                auto_shard_dataset)
103    super(MirroredStrategy, self).__init__(extended)
104
105  # Override to change the documentation to reflect the different handling of
106  # global vs. local batch size between core and contrib.
107  def make_dataset_iterator(self, dataset):  # pylint: disable=useless-super-delegation
108    """Makes an iterator for input provided via `dataset`.
109
110    NOTE: The batch size of the `dataset` argument is treated differently for
111    this contrib version of `MirroredStrategy`.
112
113    Data from the given dataset will be distributed evenly across all the
114    compute replicas. We will assume that the input dataset is batched by the
115    per-replica batch size.
116
117    The user could also use `make_input_fn_iterator` if they want to
118    customize which input is fed to which replica/worker etc.
119
120    Args:
121      dataset: `tf.data.Dataset` that will be distributed evenly across all
122        replicas.
123
124    Returns:
125      An `tf.distribute.InputIterator` which returns inputs for each step of the
126      computation.  User should call `initialize` on the returned iterator.
127    """
128    return super(MirroredStrategy, self).make_dataset_iterator(dataset)
129
130  # Override to change the documentation to reflect the different handling of
131  # global vs. local batch size between core and contrib.
132  def experimental_make_numpy_iterator(  # pylint: disable=useless-super-delegation
133      self, numpy_input, batch_size, num_epochs=1, shuffle=1024, session=None):
134    """Makes an iterator for input provided via a nest of numpy arrays.
135
136    NOTE: The `batch_size` argument here has different behavior for this
137    contrib version of `MirroredStrategy`.
138
139    Args:
140      numpy_input: A nest of NumPy input arrays that will be distributed evenly
141        across all replicas.
142      batch_size: The number of entries from the array we should consume in one
143        step of the computation, across all replicas. This is the per-replica
144        batch size. The global batch size will be this times
145        `num_replicas_in_sync`.
146      num_epochs: The number of times to iterate through the examples. A value
147        of `None` means repeat forever.
148      shuffle: Size of buffer to use for shuffling the input examples.
149        Use `None` to disable shuffling.
150      session: (TensorFlow v1.x graph execution only) A session used for
151        initialization.
152
153    Returns:
154      An `tf.distribute.InputIterator` which returns inputs for each step of the
155      computation.  User should call `initialize` on the returned iterator.
156    """
157    return super(MirroredStrategy, self).experimental_make_numpy_iterator(
158        numpy_input, batch_size, num_epochs, shuffle, session)
159
160
161class MirroredExtended(CoreMirroredExtended):
162  """Implementation of (contrib) MirroredStrategy."""
163
164  def __init__(self,
165               container_strategy,
166               devices=None,
167               num_gpus_per_worker=None,
168               cross_device_ops=None,
169               auto_shard_dataset=False):
170    if devices is None:
171      devices = mirrored_strategy.all_local_devices(num_gpus_per_worker)
172    elif num_gpus_per_worker is not None:
173      raise ValueError(
174          "Must only specify one of `devices` and `num_gpus_per_worker`.")
175    super(MirroredExtended, self).__init__(container_strategy, devices,
176                                           cross_device_ops)
177    self._auto_shard_dataset = auto_shard_dataset
178
179  def _make_dataset_iterator(self, dataset):
180    """Make iterator from dataset without splitting the batch.
181
182    This implementation is different than the one in
183    `tf.distribute.MirroredStrategy` for purposes of backward compatibility.
184    We treat the incoming dataset's batch size as per replica batch size.
185
186    Args:
187      dataset: `tf.data.Dataset` for input.
188    Returns:
189      An `InputIterator` which returns inputs for each step of the computation.
190    """
191    return input_lib.DatasetIterator(dataset, self._input_workers)
192
193  # TODO(priyag): Delete this once all strategies use global batch size.
194  @property
195  def _global_batch_size(self):
196    """The contrib version of Mirrored strategy uses per-replica batch size."""
197    return False
198