1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Utilities for multi-gpu training."""
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20from tensorflow.python.framework import ops
21from tensorflow.python.keras import backend as K
22from tensorflow.python.keras.engine.training import Model
23from tensorflow.python.ops import array_ops
24from tensorflow.python.util.tf_export import keras_export
25
26
27def _get_available_devices():
28  return [x.name for x in K.get_session().list_devices()]
29
30
31def _normalize_device_name(name):
32  name = '/' + name.lower().split('device:')[1]
33  return name
34
35
36@keras_export('keras.utils.multi_gpu_model')
37def multi_gpu_model(model, gpus, cpu_merge=True, cpu_relocation=False):
38  """Replicates a model on different GPUs.
39
40  Specifically, this function implements single-machine
41  multi-GPU data parallelism. It works in the following way:
42
43  - Divide the model's input(s) into multiple sub-batches.
44  - Apply a model copy on each sub-batch. Every model copy
45      is executed on a dedicated GPU.
46  - Concatenate the results (on CPU) into one big batch.
47
48  E.g. if your `batch_size` is 64 and you use `gpus=2`,
49  then we will divide the input into 2 sub-batches of 32 samples,
50  process each sub-batch on one GPU, then return the full
51  batch of 64 processed samples.
52
53  This induces quasi-linear speedup on up to 8 GPUs.
54
55  This function is only available with the TensorFlow backend
56  for the time being.
57
58  Arguments:
59      model: A Keras model instance. To avoid OOM errors,
60          this model could have been built on CPU, for instance
61          (see usage example below).
62      gpus: Integer >= 2, number of on GPUs on which to create
63          model replicas.
64      cpu_merge: A boolean value to identify whether to force
65          merging model weights under the scope of the CPU or not.
66      cpu_relocation: A boolean value to identify whether to
67          create the model's weights under the scope of the CPU.
68          If the model is not defined under any preceding device
69          scope, you can still rescue it by activating this option.
70
71  Returns:
72      A Keras `Model` instance which can be used just like the initial
73      `model` argument, but which distributes its workload on multiple GPUs.
74
75  Example 1: Training models with weights merge on CPU
76
77  ```python
78      import tensorflow as tf
79      from keras.applications import Xception
80      from keras.utils import multi_gpu_model
81      import numpy as np
82
83      num_samples = 1000
84      height = 224
85      width = 224
86      num_classes = 1000
87
88      # Instantiate the base model (or "template" model).
89      # We recommend doing this with under a CPU device scope,
90      # so that the model's weights are hosted on CPU memory.
91      # Otherwise they may end up hosted on a GPU, which would
92      # complicate weight sharing.
93      with tf.device('/cpu:0'):
94          model = Xception(weights=None,
95                           input_shape=(height, width, 3),
96                           classes=num_classes)
97
98      # Replicates the model on 8 GPUs.
99      # This assumes that your machine has 8 available GPUs.
100      parallel_model = multi_gpu_model(model, gpus=8)
101      parallel_model.compile(loss='categorical_crossentropy',
102                             optimizer='rmsprop')
103
104      # Generate dummy data.
105      x = np.random.random((num_samples, height, width, 3))
106      y = np.random.random((num_samples, num_classes))
107
108      # This `fit` call will be distributed on 8 GPUs.
109      # Since the batch size is 256, each GPU will process 32 samples.
110      parallel_model.fit(x, y, epochs=20, batch_size=256)
111
112      # Save model via the template model (which shares the same weights):
113      model.save('my_model.h5')
114  ```
115
116  Example 2: Training models with weights merge on CPU using cpu_relocation
117
118  ```python
119       ..
120       # Not needed to change the device scope for model definition:
121       model = Xception(weights=None, ..)
122
123       try:
124           model = multi_gpu_model(model, cpu_relocation=True)
125           print("Training using multiple GPUs..")
126       except:
127           print("Training using single GPU or CPU..")
128
129       model.compile(..)
130       ..
131  ```
132
133  Example 3: Training models with weights merge on GPU (recommended for NV-link)
134
135  ```python
136       ..
137       # Not needed to change the device scope for model definition:
138       model = Xception(weights=None, ..)
139
140       try:
141           model = multi_gpu_model(model, cpu_merge=False)
142           print("Training using multiple GPUs..")
143       except:
144           print("Training using single GPU or CPU..")
145       model.compile(..)
146       ..
147  ```
148
149  Raises:
150    ValueError: if the `gpus` argument does not match available devices.
151  """
152  # pylint: disable=g-import-not-at-top
153  from tensorflow.python.keras.layers.core import Lambda
154  from tensorflow.python.keras.layers.merge import concatenate
155
156  if isinstance(gpus, (list, tuple)):
157    if len(gpus) <= 1:
158      raise ValueError('For multi-gpu usage to be effective, '
159                       'call `multi_gpu_model` with `len(gpus) >= 2`. '
160                       'Received: `gpus=%s`' % gpus)
161    num_gpus = len(gpus)
162    target_gpu_ids = gpus
163  else:
164    if gpus <= 1:
165      raise ValueError('For multi-gpu usage to be effective, '
166                       'call `multi_gpu_model` with `gpus >= 2`. '
167                       'Received: `gpus=%s`' % gpus)
168    num_gpus = gpus
169    target_gpu_ids = range(num_gpus)
170
171  target_devices = ['/cpu:0'] + ['/gpu:%d' % i for i in target_gpu_ids]
172  available_devices = _get_available_devices()
173  available_devices = [
174      _normalize_device_name(name) for name in available_devices
175  ]
176  for device in target_devices:
177    if device not in available_devices:
178      raise ValueError('To call `multi_gpu_model` with `gpus=%s`, '
179                       'we expect the following devices to be available: %s. '
180                       'However this machine only has: %s. '
181                       'Try reducing `gpus`.' % (gpus, target_devices,
182                                                 available_devices))
183
184  def get_slice(data, i, parts):
185    """Slice an array into `parts` slices and return slice `i`.
186
187    Arguments:
188      data: array to slice.
189      i: index of slice to return.
190      parts: number of slices to make.
191
192    Returns:
193      Slice `i` of `data`.
194    """
195    shape = array_ops.shape(data)
196    batch_size = shape[:1]
197    input_shape = shape[1:]
198    step = batch_size // parts
199    if i == parts - 1:
200      size = batch_size - step * i
201    else:
202      size = step
203    size = array_ops.concat([size, input_shape], axis=0)
204    stride = array_ops.concat([step, input_shape * 0], axis=0)
205    start = stride * i
206    return array_ops.slice(data, start, size)
207
208  # Relocate the model definition under CPU device scope if needed
209  if cpu_relocation:
210    from tensorflow.python.keras.models import clone_model  # pylint: disable=g-import-not-at-top
211    with ops.device('/cpu:0'):
212      model = clone_model(model)
213
214  all_outputs = []
215  for i in range(len(model.outputs)):
216    all_outputs.append([])
217
218  # Place a copy of the model on each GPU,
219  # each getting a slice of the inputs.
220  for i, gpu_id in enumerate(target_gpu_ids):
221    with ops.device('/gpu:%d' % gpu_id):
222      with ops.name_scope('replica_%d' % gpu_id):
223        inputs = []
224        # Retrieve a slice of the input.
225        for x in model.inputs:
226          input_shape = tuple(x.get_shape().as_list())[1:]
227          slice_i = Lambda(
228              get_slice,
229              output_shape=input_shape,
230              arguments={
231                  'i': i,
232                  'parts': num_gpus
233              })(
234                  x)
235          inputs.append(slice_i)
236
237        # Apply model on slice
238        # (creating a model replica on the target device).
239        outputs = model(inputs)
240        if not isinstance(outputs, list):
241          outputs = [outputs]
242
243        # Save the outputs for merging back together later.
244        for o in range(len(outputs)):
245          all_outputs[o].append(outputs[o])
246
247  # Deduplicate output names to handle Siamese networks.
248  occurrences = {}
249  for n in model.output_names:
250    if n not in occurrences:
251      occurrences[n] = 1
252    else:
253      occurrences[n] += 1
254  conflict_counter = {n: 0 for n, count in occurrences.items() if count > 1}
255  output_names = []
256  for n in model.output_names:
257    if n in conflict_counter:
258      conflict_counter[n] += 1
259      n += '_%d' % conflict_counter[n]
260    output_names.append(n)
261
262  # Merge outputs under expected scope.
263  with ops.device('/cpu:0' if cpu_merge else '/gpu:%d' % target_gpu_ids[0]):
264    merged = []
265    for name, outputs in zip(output_names, all_outputs):
266      merged.append(concatenate(outputs, axis=0, name=name))
267    return Model(model.inputs, merged)
268