1# Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ====================================== 15"""Library of TPU helper functions.""" 16 17from __future__ import absolute_import 18from __future__ import division 19from __future__ import print_function 20 21import math 22import numpy as np 23from six.moves import xrange # pylint: disable=redefined-builtin 24 25from tensorflow.python.tpu.topology import Topology 26from tensorflow.python.util.tf_export import tf_export 27 28 29SINGLE_CORE_ASSIGNMENT = [[[0, 0, 0]]] 30 31 32def _compute_task_and_cores_to_replicas(core_assignment, topology): 33 """Computes a nested dict which maps task and logical core to replicas.""" 34 task_and_cores_to_replicas = {} 35 for replica in xrange(core_assignment.shape[0]): 36 for logical_core in xrange(core_assignment.shape[1]): 37 coordinates = core_assignment[replica, logical_core, :] 38 task_id = topology.task_ordinal_at_coordinates(coordinates) 39 if task_id not in task_and_cores_to_replicas: 40 task_and_cores_to_replicas[task_id] = {} 41 if logical_core not in task_and_cores_to_replicas[task_id]: 42 task_and_cores_to_replicas[task_id][logical_core] = set() 43 44 task_and_cores_to_replicas[task_id][logical_core].add(replica) 45 46 task_to_sorted_replica_id = {} 47 48 for task, core_to_replicas in task_and_cores_to_replicas.items(): 49 core_to_sorted_replicas = {} 50 for core, replicas in core_to_replicas.items(): 51 core_to_sorted_replicas[core] = sorted(replicas) 52 53 task_to_sorted_replica_id[task] = core_to_sorted_replicas 54 return task_to_sorted_replica_id 55 56 57@tf_export("tpu.experimental.DeviceAssignment") 58class DeviceAssignment(object): 59 """Mapping from logical cores in a computation to the physical TPU topology. 60 61 Prefer to use the `DeviceAssignment.build()` helper to construct a 62 `DeviceAssignment`; it is easier if less flexible than constructing a 63 `DeviceAssignment` directly. 64 """ 65 66 def __init__(self, topology, core_assignment): 67 """Constructs a `DeviceAssignment` object. 68 69 Args: 70 topology: A `Topology` object that describes the physical TPU topology. 71 core_assignment: A logical to physical core mapping, represented as a 72 rank 3 numpy array. See the description of the `core_assignment` 73 property for more details. 74 75 Raises: 76 ValueError: If `topology` is not `Topology` object. 77 ValueError: If `core_assignment` is not a rank 3 numpy array. 78 """ 79 if not isinstance(topology, Topology): 80 raise ValueError("topology must be a Topology object, got {}".format( 81 type(topology))) 82 core_assignment = np.asarray(core_assignment, dtype=np.int32) 83 84 self._topology = topology 85 86 if core_assignment.ndim != 3: 87 raise ValueError("core_assignment must be a rank 3 numpy array, " 88 "got shape {}".format(core_assignment.shape)) 89 90 self._num_replicas = core_assignment.shape[0] 91 self._num_cores_per_replica = core_assignment.shape[1] 92 93 if core_assignment.shape[-1] != topology.mesh_rank: 94 raise ValueError( 95 "minor dimension of core_assignment must have size equal to topology " 96 "rank ({}), got shape {}".format(topology.mesh_rank, 97 core_assignment.shape)) 98 99 self._core_assignment = core_assignment 100 self._task_and_cores_to_replicas = _compute_task_and_cores_to_replicas( 101 self._core_assignment, topology) 102 103 @property 104 def topology(self): 105 """A `Topology` that describes the TPU topology.""" 106 return self._topology 107 108 @property 109 def num_cores_per_replica(self): 110 """The number of cores per replica.""" 111 return self._num_cores_per_replica 112 113 @property 114 def num_replicas(self): 115 """The number of replicas of the computation.""" 116 return self._num_replicas 117 118 @property 119 def core_assignment(self): 120 """The logical to physical core mapping. 121 122 Returns: 123 An integer numpy array of rank 3, with shape 124 `[num_replicas, num_cores_per_replica, topology_rank]`. Maps 125 (replica, logical core) pairs to physical topology coordinates. 126 """ 127 return self._core_assignment 128 129 def coordinates(self, replica, logical_core): 130 """Returns the physical topology coordinates of a logical core.""" 131 return tuple(self.core_assignment[replica, logical_core, :]) 132 133 def lookup_replicas(self, task_id, logical_core): 134 """Lookup replica ids by task number and logical core. 135 136 Args: 137 task_id: TensorFlow task number. 138 logical_core: An integer, identifying a logical core. 139 Returns: 140 A sorted list of the replicas that are attached to that task and 141 logical_core. 142 Raises: 143 ValueError: If no replica exists in the task which contains the logical 144 core. 145 """ 146 try: 147 return self._task_and_cores_to_replicas[task_id][logical_core] 148 except KeyError: 149 raise ValueError( 150 "Can not find any replica in task: {} contains logical_core: {} ". 151 format(task_id, logical_core)) 152 153 def tpu_ordinal(self, replica=0, logical_core=0): 154 """Returns the ordinal of the TPU device assigned to a logical core.""" 155 coordinates = self.coordinates(replica, logical_core) 156 return self._topology.tpu_device_ordinal_at_coordinates(coordinates) 157 158 def host_device(self, replica=0, logical_core=0, job=None): 159 """Returns the CPU device attached to a logical core.""" 160 coordinates = self.coordinates(replica, logical_core) 161 return self._topology.cpu_device_name_at_coordinates(coordinates, job=job) 162 163 def tpu_device(self, replica=0, logical_core=0, job=None): 164 """Returns the name of the TPU device assigned to a logical core.""" 165 coordinates = self.coordinates(replica, logical_core) 166 return self._topology.tpu_device_name_at_coordinates(coordinates, job=job) 167 168 @staticmethod 169 def build(topology, 170 computation_shape=None, 171 computation_stride=None, 172 num_replicas=1): 173 return device_assignment(topology, computation_shape, computation_stride, 174 num_replicas) 175 176 177def device_assignment(topology, 178 computation_shape=None, 179 computation_stride=None, 180 num_replicas=1): 181 """Computes a device_assignment of a computation across a TPU topology. 182 183 Attempts to choose a compact grid of cores for locality. 184 185 Returns a `DeviceAssignment` that describes the cores in the topology assigned 186 to each core of each replica. 187 188 `computation_shape` and `computation_stride` values should be powers of 2 for 189 optimal packing. 190 191 Args: 192 topology: A `Topology` object that describes the TPU cluster topology. 193 To obtain a TPU topology, evaluate the `Tensor` returned by 194 `initialize_system` using `Session.run`. Either a serialized 195 `TopologyProto` or a `Topology` object may be passed. Note: you must 196 evaluate the `Tensor` first; you cannot pass an unevaluated `Tensor` here. 197 computation_shape: A rank 1 int32 numpy array with size equal to the 198 topology rank, describing the shape of the computation's block of cores. 199 If None, the `computation_shape` is `[1] * topology_rank`. 200 computation_stride: A rank 1 int32 numpy array of size `topology_rank`, 201 describing the inter-core spacing of the `computation_shape` cores in the 202 TPU topology. If None, the `computation_stride` is `[1] * topology_rank`. 203 num_replicas: The number of computation replicas to run. The replicas will 204 be packed into the free spaces of the topology. 205 206 Returns: 207 A DeviceAssignment object, which describes the mapping between the logical 208 cores in each computation replica and the physical cores in the TPU 209 topology. 210 211 Raises: 212 ValueError: If `topology` is not a valid `Topology` object. 213 ValueError: If `computation_shape` or `computation_stride` are not 1D int32 214 numpy arrays with shape [3] where all values are positive. 215 ValueError: If computation's replicas cannot fit into the TPU topology. 216 """ 217 # Deserialize the Topology proto, if it is a string. 218 if isinstance(topology, bytes): 219 topology = Topology(serialized=topology) 220 221 if not isinstance(topology, Topology): 222 raise ValueError("`topology` is not a Topology object; got {}".format( 223 type(topology))) 224 225 topology_rank = len(topology.mesh_shape) 226 mesh_shape = topology.mesh_shape 227 if computation_shape is None: 228 computation_shape = np.array([1] * topology_rank, dtype=np.int32) 229 else: 230 computation_shape = np.asarray(computation_shape, dtype=np.int32) 231 232 if computation_stride is None: 233 computation_stride = np.array([1] * topology_rank, dtype=np.int32) 234 else: 235 computation_stride = np.asarray(computation_stride, dtype=np.int32) 236 237 if computation_shape.shape != (topology_rank,): 238 raise ValueError("computation_shape must have shape [{}]; got {}".format( 239 topology_rank, computation_shape.shape)) 240 if computation_stride.shape != (topology_rank,): 241 raise ValueError("computation_stride must have shape [{}]; got {}".format( 242 topology_rank, computation_stride.shape)) 243 244 if any(computation_shape < 1): 245 raise ValueError( 246 "computation_shape must be positive; got computation_shape={}".format( 247 computation_shape)) 248 if any(computation_stride < 1): 249 raise ValueError( 250 "computation_stride must be positive; got computation_stride={}".format( 251 computation_stride)) 252 253 # Computes the physical size of one computation instance. 254 computation_footprint = computation_shape * computation_stride 255 if any(computation_footprint > mesh_shape): 256 raise ValueError( 257 "computation footprint {} does not fit in TPU topology shape {}".format( 258 computation_footprint, mesh_shape)) 259 260 # Computes how many copies of the computation footprint fit in the mesh. 261 block_counts = mesh_shape // computation_footprint 262 263 replica_counts = block_counts * computation_stride 264 max_replicas = np.prod(replica_counts) 265 if num_replicas > max_replicas: 266 raise ValueError( 267 "requested {} replicas but only {} replicas with shape {} and " 268 "computation_stride {} fit in a TPU mesh of shape {}".format( 269 num_replicas, max_replicas, computation_shape, computation_stride, 270 mesh_shape)) 271 272 def ceil_of_ratio(n, m): 273 return (n + m - 1) // m 274 275 replica_shape = [0] * topology_rank 276 if num_replicas > 0: 277 remaining_replicas = num_replicas 278 remaining_dims = topology_rank 279 280 # Choose dimensions as close to an equal cube as possible, in order of 281 # increasing dimension size. By visiting dimensions in increasing size, we 282 # assign the most constrained dimension first, so we won't make infeasible 283 # choices. 284 # 285 # As a secondary sort order, visit the dimensions in reverse order. This 286 # means we try to use both cores on the same chip in preference to two cores 287 # on different chips. 288 for x, ni in sorted(((x, -i) for (i, x) in enumerate(replica_counts))): 289 i = -ni 290 target_size = int(math.ceil(remaining_replicas**(1.0 / remaining_dims))) 291 replica_shape[i] = min(target_size, x) 292 remaining_replicas = ceil_of_ratio(remaining_replicas, replica_shape[i]) 293 remaining_dims -= 1 294 295 assert remaining_replicas == 1 and remaining_dims == 0 296 297 # Assigns an offset to each replica such that no two replicas overlap. 298 replica_offsets = np.full([num_replicas, topology_rank], -1, dtype=np.int32) 299 for replica in xrange(num_replicas): 300 # Chooses a replica number in each axis. 301 t = replica 302 pos = [] 303 for dim in replica_shape[::-1]: 304 pos.append(t % dim) 305 t //= dim 306 replica_pos = np.array(pos[::-1], dtype=np.int32) 307 308 # Determines where that replica starts in each axis. 309 outer = replica_pos // computation_stride 310 inner = replica_pos % computation_stride 311 replica_offsets[replica, :] = outer * computation_footprint + inner 312 313 # Computes a complete logical core -> physical core mapping for each replica. 314 indices = [ 315 np.arange(0, computation_shape[i] * computation_stride[i], 316 computation_stride[i]) for i in xrange(topology_rank) 317 ] 318 indices = np.concatenate( 319 [i[..., np.newaxis] for i in np.meshgrid(*indices, indexing="ij")], 320 axis=-1) 321 indices = indices.reshape((-1, topology_rank)) 322 assignment = indices + replica_offsets[:, np.newaxis, :] 323 return DeviceAssignment(topology, core_assignment=assignment) 324