1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ======================================
15"""Library of TPU helper functions."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21import math
22import numpy as np
23from six.moves import xrange  # pylint: disable=redefined-builtin
24
25from tensorflow.python.tpu.topology import Topology
26from tensorflow.python.util.tf_export import tf_export
27
28
29SINGLE_CORE_ASSIGNMENT = [[[0, 0, 0]]]
30
31
32def _compute_task_and_cores_to_replicas(core_assignment, topology):
33  """Computes a nested dict which maps task and logical core to replicas."""
34  task_and_cores_to_replicas = {}
35  for replica in xrange(core_assignment.shape[0]):
36    for logical_core in xrange(core_assignment.shape[1]):
37      coordinates = core_assignment[replica, logical_core, :]
38      task_id = topology.task_ordinal_at_coordinates(coordinates)
39      if task_id not in task_and_cores_to_replicas:
40        task_and_cores_to_replicas[task_id] = {}
41      if logical_core not in task_and_cores_to_replicas[task_id]:
42        task_and_cores_to_replicas[task_id][logical_core] = set()
43
44      task_and_cores_to_replicas[task_id][logical_core].add(replica)
45
46  task_to_sorted_replica_id = {}
47
48  for task, core_to_replicas in task_and_cores_to_replicas.items():
49    core_to_sorted_replicas = {}
50    for core, replicas in core_to_replicas.items():
51      core_to_sorted_replicas[core] = sorted(replicas)
52
53    task_to_sorted_replica_id[task] = core_to_sorted_replicas
54  return task_to_sorted_replica_id
55
56
57@tf_export("tpu.experimental.DeviceAssignment")
58class DeviceAssignment(object):
59  """Mapping from logical cores in a computation to the physical TPU topology.
60
61  Prefer to use the `DeviceAssignment.build()` helper to construct a
62  `DeviceAssignment`; it is easier if less flexible than constructing a
63  `DeviceAssignment` directly.
64  """
65
66  def __init__(self, topology, core_assignment):
67    """Constructs a `DeviceAssignment` object.
68
69    Args:
70      topology: A `Topology` object that describes the physical TPU topology.
71      core_assignment: A logical to physical core mapping, represented as a
72        rank 3 numpy array. See the description of the `core_assignment`
73        property for more details.
74
75    Raises:
76      ValueError: If `topology` is not `Topology` object.
77      ValueError: If `core_assignment` is not a rank 3 numpy array.
78    """
79    if not isinstance(topology, Topology):
80      raise ValueError("topology must be a Topology object, got {}".format(
81          type(topology)))
82    core_assignment = np.asarray(core_assignment, dtype=np.int32)
83
84    self._topology = topology
85
86    if core_assignment.ndim != 3:
87      raise ValueError("core_assignment must be a rank 3 numpy array, "
88                       "got shape {}".format(core_assignment.shape))
89
90    self._num_replicas = core_assignment.shape[0]
91    self._num_cores_per_replica = core_assignment.shape[1]
92
93    if core_assignment.shape[-1] != topology.mesh_rank:
94      raise ValueError(
95          "minor dimension of core_assignment must have size equal to topology "
96          "rank ({}), got shape {}".format(topology.mesh_rank,
97                                           core_assignment.shape))
98
99    self._core_assignment = core_assignment
100    self._task_and_cores_to_replicas = _compute_task_and_cores_to_replicas(
101        self._core_assignment, topology)
102
103  @property
104  def topology(self):
105    """A `Topology` that describes the TPU topology."""
106    return self._topology
107
108  @property
109  def num_cores_per_replica(self):
110    """The number of cores per replica."""
111    return self._num_cores_per_replica
112
113  @property
114  def num_replicas(self):
115    """The number of replicas of the computation."""
116    return self._num_replicas
117
118  @property
119  def core_assignment(self):
120    """The logical to physical core mapping.
121
122    Returns:
123      An integer numpy array of rank 3, with shape
124      `[num_replicas, num_cores_per_replica, topology_rank]`. Maps
125      (replica, logical core) pairs to physical topology coordinates.
126    """
127    return self._core_assignment
128
129  def coordinates(self, replica, logical_core):
130    """Returns the physical topology coordinates of a logical core."""
131    return tuple(self.core_assignment[replica, logical_core, :])
132
133  def lookup_replicas(self, task_id, logical_core):
134    """Lookup replica ids by task number and logical core.
135
136    Args:
137      task_id: TensorFlow task number.
138      logical_core: An integer, identifying a logical core.
139    Returns:
140      A sorted list of the replicas that are attached to that task and
141      logical_core.
142    Raises:
143      ValueError: If no replica exists in the task which contains the logical
144      core.
145    """
146    try:
147      return self._task_and_cores_to_replicas[task_id][logical_core]
148    except KeyError:
149      raise ValueError(
150          "Can not find any replica in task: {} contains logical_core: {} ".
151          format(task_id, logical_core))
152
153  def tpu_ordinal(self, replica=0, logical_core=0):
154    """Returns the ordinal of the TPU device assigned to a logical core."""
155    coordinates = self.coordinates(replica, logical_core)
156    return self._topology.tpu_device_ordinal_at_coordinates(coordinates)
157
158  def host_device(self, replica=0, logical_core=0, job=None):
159    """Returns the CPU device attached to a logical core."""
160    coordinates = self.coordinates(replica, logical_core)
161    return self._topology.cpu_device_name_at_coordinates(coordinates, job=job)
162
163  def tpu_device(self, replica=0, logical_core=0, job=None):
164    """Returns the name of the TPU device assigned to a logical core."""
165    coordinates = self.coordinates(replica, logical_core)
166    return self._topology.tpu_device_name_at_coordinates(coordinates, job=job)
167
168  @staticmethod
169  def build(topology,
170            computation_shape=None,
171            computation_stride=None,
172            num_replicas=1):
173    return device_assignment(topology, computation_shape, computation_stride,
174                             num_replicas)
175
176
177def device_assignment(topology,
178                      computation_shape=None,
179                      computation_stride=None,
180                      num_replicas=1):
181  """Computes a device_assignment of a computation across a TPU topology.
182
183  Attempts to choose a compact grid of cores for locality.
184
185  Returns a `DeviceAssignment` that describes the cores in the topology assigned
186  to each core of each replica.
187
188  `computation_shape` and `computation_stride` values should be powers of 2 for
189  optimal packing.
190
191  Args:
192    topology: A `Topology` object that describes the TPU cluster topology.
193      To obtain a TPU topology, evaluate the `Tensor` returned by
194      `initialize_system` using `Session.run`. Either a serialized
195      `TopologyProto` or a `Topology` object may be passed. Note: you must
196      evaluate the `Tensor` first; you cannot pass an unevaluated `Tensor` here.
197    computation_shape: A rank 1 int32 numpy array with size equal to the
198      topology rank, describing the shape of the computation's block of cores.
199      If None, the `computation_shape` is `[1] * topology_rank`.
200    computation_stride: A rank 1 int32 numpy array of size `topology_rank`,
201      describing the inter-core spacing of the `computation_shape` cores in the
202      TPU topology. If None, the `computation_stride` is `[1] * topology_rank`.
203    num_replicas: The number of computation replicas to run. The replicas will
204      be packed into the free spaces of the topology.
205
206  Returns:
207    A DeviceAssignment object, which describes the mapping between the logical
208    cores in each computation replica and the physical cores in the TPU
209    topology.
210
211  Raises:
212    ValueError: If `topology` is not a valid `Topology` object.
213    ValueError: If `computation_shape` or `computation_stride` are not 1D int32
214      numpy arrays with shape [3] where all values are positive.
215    ValueError: If computation's replicas cannot fit into the TPU topology.
216  """
217  # Deserialize the Topology proto, if it is a string.
218  if isinstance(topology, bytes):
219    topology = Topology(serialized=topology)
220
221  if not isinstance(topology, Topology):
222    raise ValueError("`topology` is not a Topology object; got {}".format(
223        type(topology)))
224
225  topology_rank = len(topology.mesh_shape)
226  mesh_shape = topology.mesh_shape
227  if computation_shape is None:
228    computation_shape = np.array([1] * topology_rank, dtype=np.int32)
229  else:
230    computation_shape = np.asarray(computation_shape, dtype=np.int32)
231
232  if computation_stride is None:
233    computation_stride = np.array([1] * topology_rank, dtype=np.int32)
234  else:
235    computation_stride = np.asarray(computation_stride, dtype=np.int32)
236
237  if computation_shape.shape != (topology_rank,):
238    raise ValueError("computation_shape must have shape [{}]; got {}".format(
239        topology_rank, computation_shape.shape))
240  if computation_stride.shape != (topology_rank,):
241    raise ValueError("computation_stride must have shape [{}]; got {}".format(
242        topology_rank, computation_stride.shape))
243
244  if any(computation_shape < 1):
245    raise ValueError(
246        "computation_shape must be positive; got computation_shape={}".format(
247            computation_shape))
248  if any(computation_stride < 1):
249    raise ValueError(
250        "computation_stride must be positive; got computation_stride={}".format(
251            computation_stride))
252
253  # Computes the physical size of one computation instance.
254  computation_footprint = computation_shape * computation_stride
255  if any(computation_footprint > mesh_shape):
256    raise ValueError(
257        "computation footprint {} does not fit in TPU topology shape {}".format(
258            computation_footprint, mesh_shape))
259
260  # Computes how many copies of the computation footprint fit in the mesh.
261  block_counts = mesh_shape // computation_footprint
262
263  replica_counts = block_counts * computation_stride
264  max_replicas = np.prod(replica_counts)
265  if num_replicas > max_replicas:
266    raise ValueError(
267        "requested {} replicas but only {} replicas with shape {} and "
268        "computation_stride {} fit in a TPU mesh of shape {}".format(
269            num_replicas, max_replicas, computation_shape, computation_stride,
270            mesh_shape))
271
272  def ceil_of_ratio(n, m):
273    return (n + m - 1) // m
274
275  replica_shape = [0] * topology_rank
276  if num_replicas > 0:
277    remaining_replicas = num_replicas
278    remaining_dims = topology_rank
279
280    # Choose dimensions as close to an equal cube as possible, in order of
281    # increasing dimension size. By visiting dimensions in increasing size, we
282    # assign the most constrained dimension first, so we won't make infeasible
283    # choices.
284    #
285    # As a secondary sort order, visit the dimensions in reverse order. This
286    # means we try to use both cores on the same chip in preference to two cores
287    # on different chips.
288    for x, ni in sorted(((x, -i) for (i, x) in enumerate(replica_counts))):
289      i = -ni
290      target_size = int(math.ceil(remaining_replicas**(1.0 / remaining_dims)))
291      replica_shape[i] = min(target_size, x)
292      remaining_replicas = ceil_of_ratio(remaining_replicas, replica_shape[i])
293      remaining_dims -= 1
294
295    assert remaining_replicas == 1 and remaining_dims == 0
296
297  # Assigns an offset to each replica such that no two replicas overlap.
298  replica_offsets = np.full([num_replicas, topology_rank], -1, dtype=np.int32)
299  for replica in xrange(num_replicas):
300    # Chooses a replica number in each axis.
301    t = replica
302    pos = []
303    for dim in replica_shape[::-1]:
304      pos.append(t % dim)
305      t //= dim
306    replica_pos = np.array(pos[::-1], dtype=np.int32)
307
308    # Determines where that replica starts in each axis.
309    outer = replica_pos // computation_stride
310    inner = replica_pos % computation_stride
311    replica_offsets[replica, :] = outer * computation_footprint + inner
312
313  # Computes a complete logical core -> physical core mapping for each replica.
314  indices = [
315      np.arange(0, computation_shape[i] * computation_stride[i],
316                computation_stride[i]) for i in xrange(topology_rank)
317  ]
318  indices = np.concatenate(
319      [i[..., np.newaxis] for i in np.meshgrid(*indices, indexing="ij")],
320      axis=-1)
321  indices = indices.reshape((-1, topology_rank))
322  assignment = indices + replica_offsets[:, np.newaxis, :]
323  return DeviceAssignment(topology, core_assignment=assignment)
324