1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Contrib version of MirroredStrategy.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21from tensorflow.python.distribute import distribute_lib 22from tensorflow.python.distribute import input_lib 23from tensorflow.python.distribute import mirrored_strategy 24 25 26# pylint: disable=protected-access,invalid-name 27_call_for_each_replica = mirrored_strategy._call_for_each_replica 28_create_mirrored_variable = mirrored_strategy._create_mirrored_variable 29all_local_devices = mirrored_strategy.all_local_devices 30CoreMirroredStrategy = mirrored_strategy.MirroredStrategy 31CoreMirroredExtended = mirrored_strategy.MirroredExtended 32# pylint: enable=protected-access,invalid-name 33 34 35class MirroredStrategy(distribute_lib.DistributionStrategy): 36 """Mirrors vars to distribute across multiple devices and machines. 37 38 *** contrib version *** 39 40 This strategy uses one replica per device and sync replication for its 41 multi-GPU version. 42 43 When `cluster_spec` is given by the `configure` method., it turns into the 44 mulit-worker version that works on multiple workers with in-graph replication. 45 Note: `configure` will be called by higher-level APIs if running in 46 distributed environment. 47 48 There are several important concepts for distributed TensorFlow, e.g. 49 `client`, `job`, `task`, `cluster`, `in-graph replication` and 50 `synchronous training` and they have already been defined in the 51 [TensorFlow's documentation](https://www.tensorflow.org/deploy/distributed). 52 The distribution strategy inherits these concepts as well and in addition to 53 that we also clarify several more concepts: 54 55 * **In-graph replication**: the `client` creates a single `tf.Graph` that 56 specifies tasks for devices on all workers. The `client` then creates a 57 client session which will talk to the `master` service of a `worker`. Then 58 the `master` will partition the graph and distribute the work to all 59 participating workers. 60 * **Worker**: A `worker` is a TensorFlow `task` that usually maps to one 61 physical machine. We will have multiple `worker`s with different `task` 62 index. They all do similar things except for one worker checkpointing model 63 variables, writing summaries, etc. in addition to its ordinary work. 64 65 The multi-worker version of this class maps one replica to one device on a 66 worker. It mirrors all model variables on all replicas. For example, if you 67 have two `worker`s and each `worker` has 4 GPUs, it will create 8 copies of 68 the model variables on these 8 GPUs. Then like in MirroredStrategy, each 69 replica performs their computation with their own copy of variables unless in 70 cross-replica model where variable or tensor reduction happens. 71 72 Args: 73 devices: a list of device strings. 74 num_gpus: number of GPUs. For local training, either specify `devices` or 75 `num_gpus`. In distributed training, this must be specified as number of 76 GPUs on each worker. 77 num_gpus_per_worker: number of GPUs per worker. This is the same as 78 `num_gpus` and only one of `num_gpus` and `num_gpus_per_worker` can be 79 specified. 80 cross_device_ops: optional, a descedant of `CrossDeviceOps`. If this is not 81 set, the `configure` method will try to find the best one. 82 auto_shard_dataset: whether to auto-shard the dataset when there are 83 multiple workers. 84 cross_tower_ops: Deprecated alias for `cross_device_ops`. 85 """ 86 87 def __init__(self, 88 devices=None, 89 num_gpus=None, 90 num_gpus_per_worker=None, 91 cross_device_ops=None, 92 auto_shard_dataset=False, 93 cross_tower_ops=None): 94 assert not (cross_device_ops and cross_tower_ops) 95 if num_gpus is not None and num_gpus_per_worker is not None: 96 raise ValueError( 97 "You cannot specify both `num_gpus` and `num_gpus_per_worker`.") 98 if num_gpus is None: 99 num_gpus = num_gpus_per_worker 100 extended = MirroredExtended(self, devices, num_gpus, 101 cross_device_ops or cross_tower_ops, 102 auto_shard_dataset) 103 super(MirroredStrategy, self).__init__(extended) 104 105 # Override to change the documentation to reflect the different handling of 106 # global vs. local batch size between core and contrib. 107 def make_dataset_iterator(self, dataset): # pylint: disable=useless-super-delegation 108 """Makes an iterator for input provided via `dataset`. 109 110 NOTE: The batch size of the `dataset` argument is treated differently for 111 this contrib version of `MirroredStrategy`. 112 113 Data from the given dataset will be distributed evenly across all the 114 compute replicas. We will assume that the input dataset is batched by the 115 per-replica batch size. 116 117 The user could also use `make_input_fn_iterator` if they want to 118 customize which input is fed to which replica/worker etc. 119 120 Args: 121 dataset: `tf.data.Dataset` that will be distributed evenly across all 122 replicas. 123 124 Returns: 125 An `tf.distribute.InputIterator` which returns inputs for each step of the 126 computation. User should call `initialize` on the returned iterator. 127 """ 128 return super(MirroredStrategy, self).make_dataset_iterator(dataset) 129 130 # Override to change the documentation to reflect the different handling of 131 # global vs. local batch size between core and contrib. 132 def experimental_make_numpy_iterator( # pylint: disable=useless-super-delegation 133 self, numpy_input, batch_size, num_epochs=1, shuffle=1024, session=None): 134 """Makes an iterator for input provided via a nest of numpy arrays. 135 136 NOTE: The `batch_size` argument here has different behavior for this 137 contrib version of `MirroredStrategy`. 138 139 Args: 140 numpy_input: A nest of NumPy input arrays that will be distributed evenly 141 across all replicas. 142 batch_size: The number of entries from the array we should consume in one 143 step of the computation, across all replicas. This is the per-replica 144 batch size. The global batch size will be this times 145 `num_replicas_in_sync`. 146 num_epochs: The number of times to iterate through the examples. A value 147 of `None` means repeat forever. 148 shuffle: Size of buffer to use for shuffling the input examples. 149 Use `None` to disable shuffling. 150 session: (TensorFlow v1.x graph execution only) A session used for 151 initialization. 152 153 Returns: 154 An `tf.distribute.InputIterator` which returns inputs for each step of the 155 computation. User should call `initialize` on the returned iterator. 156 """ 157 return super(MirroredStrategy, self).experimental_make_numpy_iterator( 158 numpy_input, batch_size, num_epochs, shuffle, session) 159 160 161class MirroredExtended(CoreMirroredExtended): 162 """Implementation of (contrib) MirroredStrategy.""" 163 164 def __init__(self, 165 container_strategy, 166 devices=None, 167 num_gpus_per_worker=None, 168 cross_device_ops=None, 169 auto_shard_dataset=False): 170 if devices is None: 171 devices = mirrored_strategy.all_local_devices(num_gpus_per_worker) 172 elif num_gpus_per_worker is not None: 173 raise ValueError( 174 "Must only specify one of `devices` and `num_gpus_per_worker`.") 175 super(MirroredExtended, self).__init__(container_strategy, devices, 176 cross_device_ops) 177 self._auto_shard_dataset = auto_shard_dataset 178 179 def _make_dataset_iterator(self, dataset): 180 """Make iterator from dataset without splitting the batch. 181 182 This implementation is different than the one in 183 `tf.distribute.MirroredStrategy` for purposes of backward compatibility. 184 We treat the incoming dataset's batch size as per replica batch size. 185 186 Args: 187 dataset: `tf.data.Dataset` for input. 188 Returns: 189 An `InputIterator` which returns inputs for each step of the computation. 190 """ 191 return input_lib.DatasetIterator(dataset, self._input_workers) 192 193 # TODO(priyag): Delete this once all strategies use global batch size. 194 @property 195 def _global_batch_size(self): 196 """The contrib version of Mirrored strategy uses per-replica batch size.""" 197 return False 198