1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Utilities for multi-gpu training.""" 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20from tensorflow.python.framework import ops 21from tensorflow.python.keras import backend as K 22from tensorflow.python.keras.engine.training import Model 23from tensorflow.python.keras.layers.core import Lambda 24from tensorflow.python.keras.layers.merge import concatenate 25from tensorflow.python.ops import array_ops 26 27 28def _get_available_devices(): 29 return [x.name for x in K.get_session().list_devices()] 30 31 32def _normalize_device_name(name): 33 name = '/' + name.lower().split('device:')[1] 34 return name 35 36 37def multi_gpu_model(model, gpus, cpu_merge=True, cpu_relocation=False): 38 """Replicates a model on different GPUs. 39 40 Specifically, this function implements single-machine 41 multi-GPU data parallelism. It works in the following way: 42 43 - Divide the model's input(s) into multiple sub-batches. 44 - Apply a model copy on each sub-batch. Every model copy 45 is executed on a dedicated GPU. 46 - Concatenate the results (on CPU) into one big batch. 47 48 E.g. if your `batch_size` is 64 and you use `gpus=2`, 49 then we will divide the input into 2 sub-batches of 32 samples, 50 process each sub-batch on one GPU, then return the full 51 batch of 64 processed samples. 52 53 This induces quasi-linear speedup on up to 8 GPUs. 54 55 This function is only available with the TensorFlow backend 56 for the time being. 57 58 Args: 59 model: A Keras model instance. To avoid OOM errors, 60 this model could have been built on CPU, for instance 61 (see usage example below). 62 gpus: Integer >= 2, number of on GPUs on which to create 63 model replicas. 64 cpu_merge: A boolean value to identify whether to force 65 merging model weights under the scope of the CPU or not. 66 cpu_relocation: A boolean value to identify whether to 67 create the model's weights under the scope of the CPU. 68 If the model is not defined under any preceding device 69 scope, you can still rescue it by activating this option. 70 71 Returns: 72 A Keras `Model` instance which can be used just like the initial 73 `model` argument, but which distributes its workload on multiple GPUs. 74 75 Example 1: Training models with weights merge on CPU 76 77 ```python 78 import tensorflow as tf 79 from keras.applications import Xception 80 from keras.utils import multi_gpu_model 81 import numpy as np 82 83 num_samples = 1000 84 height = 224 85 width = 224 86 num_classes = 1000 87 88 # Instantiate the base model (or "template" model). 89 # We recommend doing this with under a CPU device scope, 90 # so that the model's weights are hosted on CPU memory. 91 # Otherwise they may end up hosted on a GPU, which would 92 # complicate weight sharing. 93 with tf.device('/cpu:0'): 94 model = Xception(weights=None, 95 input_shape=(height, width, 3), 96 classes=num_classes) 97 98 # Replicates the model on 8 GPUs. 99 # This assumes that your machine has 8 available GPUs. 100 parallel_model = multi_gpu_model(model, gpus=8) 101 parallel_model.compile(loss='categorical_crossentropy', 102 optimizer='rmsprop') 103 104 # Generate dummy data. 105 x = np.random.random((num_samples, height, width, 3)) 106 y = np.random.random((num_samples, num_classes)) 107 108 # This `fit` call will be distributed on 8 GPUs. 109 # Since the batch size is 256, each GPU will process 32 samples. 110 parallel_model.fit(x, y, epochs=20, batch_size=256) 111 112 # Save model via the template model (which shares the same weights): 113 model.save('my_model.h5') 114 ``` 115 116 Example 2: Training models with weights merge on CPU using cpu_relocation 117 118 ```python 119 .. 120 # Not needed to change the device scope for model definition: 121 model = Xception(weights=None, ..) 122 123 try: 124 model = multi_gpu_model(model, cpu_relocation=True) 125 print("Training using multiple GPUs..") 126 except: 127 print("Training using single GPU or CPU..") 128 129 model.compile(..) 130 .. 131 ``` 132 133 Example 3: Training models with weights merge on GPU (recommended for NV-link) 134 135 ```python 136 .. 137 # Not needed to change the device scope for model definition: 138 model = Xception(weights=None, ..) 139 140 try: 141 model = multi_gpu_model(model, cpu_merge=False) 142 print("Training using multiple GPUs..") 143 except: 144 print("Training using single GPU or CPU..") 145 model.compile(..) 146 .. 147 ``` 148 149 Raises: 150 ValueError: if the `gpus` argument does not match available devices. 151 """ 152 if isinstance(gpus, (list, tuple)): 153 if len(gpus) <= 1: 154 raise ValueError('For multi-gpu usage to be effective, ' 155 'call `multi_gpu_model` with `len(gpus) >= 2`. ' 156 'Received: `gpus=%s`' % gpus) 157 num_gpus = len(gpus) 158 target_gpu_ids = gpus 159 else: 160 if gpus <= 1: 161 raise ValueError('For multi-gpu usage to be effective, ' 162 'call `multi_gpu_model` with `gpus >= 2`. ' 163 'Received: `gpus=%s`' % gpus) 164 num_gpus = gpus 165 target_gpu_ids = range(num_gpus) 166 167 target_devices = ['/cpu:0'] + ['/gpu:%d' % i for i in target_gpu_ids] 168 available_devices = _get_available_devices() 169 available_devices = [ 170 _normalize_device_name(name) for name in available_devices 171 ] 172 for device in target_devices: 173 if device not in available_devices: 174 raise ValueError('To call `multi_gpu_model` with `gpus=%s`, ' 175 'we expect the following devices to be available: %s. ' 176 'However this machine only has: %s. ' 177 'Try reducing `gpus`.' % (gpus, target_devices, 178 available_devices)) 179 180 def get_slice(data, i, parts): 181 """Slice an array into `parts` slices and return slice `i`. 182 183 Args: 184 data: array to slice. 185 i: index of slice to return. 186 parts: number of slices to make. 187 188 Returns: 189 Slice `i` of `data`. 190 """ 191 shape = array_ops.shape(data) 192 batch_size = shape[:1] 193 input_shape = shape[1:] 194 step = batch_size // parts 195 if i == parts - 1: 196 size = batch_size - step * i 197 else: 198 size = step 199 size = array_ops.concat([size, input_shape], axis=0) 200 stride = array_ops.concat([step, input_shape * 0], axis=0) 201 start = stride * i 202 return array_ops.slice(data, start, size) 203 204 # Relocate the model definition under CPU device scope if needed 205 if cpu_relocation: 206 from tensorflow.python.keras.models import clone_model # pylint: disable=g-import-not-at-top 207 with ops.device('/cpu:0'): 208 model = clone_model(model) 209 210 all_outputs = [[] for _ in range(len(model.outputs))] 211 212 # Place a copy of the model on each GPU, 213 # each getting a slice of the inputs. 214 for i, gpu_id in enumerate(target_gpu_ids): 215 with ops.device('/gpu:%d' % gpu_id): 216 with K.name_scope('replica_%d' % gpu_id): 217 inputs = [] 218 # Retrieve a slice of the input. 219 for x in model.inputs: 220 input_shape = tuple(x.shape.as_list())[1:] 221 slice_i = Lambda( 222 get_slice, 223 output_shape=input_shape, 224 arguments={ 225 'i': i, 226 'parts': num_gpus 227 })( 228 x) 229 inputs.append(slice_i) 230 231 # Apply model on slice 232 # (creating a model replica on the target device). 233 outputs = model(inputs) 234 if not isinstance(outputs, list): 235 outputs = [outputs] 236 237 # Save the outputs for merging back together later. 238 for o, output in enumerate(outputs): 239 all_outputs[o].append(output) 240 241 # Deduplicate output names to handle Siamese networks. 242 occurrences = {} 243 for n in model.output_names: 244 if n not in occurrences: 245 occurrences[n] = 1 246 else: 247 occurrences[n] += 1 248 conflict_counter = {n: 0 for n, count in occurrences.items() if count > 1} 249 output_names = [] 250 for n in model.output_names: 251 if n in conflict_counter: 252 conflict_counter[n] += 1 253 n += '_%d' % conflict_counter[n] 254 output_names.append(n) 255 256 # Merge outputs under expected scope. 257 with ops.device('/cpu:0' if cpu_merge else '/gpu:%d' % target_gpu_ids[0]): 258 merged = [] 259 for name, outputs in zip(output_names, all_outputs): 260 merged.append(concatenate(outputs, axis=0, name=name)) 261 return Model(model.inputs, merged) 262