1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Gaussian mixture models Operations."""
16# TODO(xavigonzalvo): Factor out covariance matrix operations to make
17# code reusable for different types (e.g. diag).
18
19from __future__ import absolute_import
20from __future__ import division
21from __future__ import print_function
22
23import numpy as np
24
25from tensorflow.python.framework import constant_op
26from tensorflow.python.framework import dtypes
27from tensorflow.python.framework import ops
28from tensorflow.python.ops import array_ops
29from tensorflow.python.ops import check_ops
30from tensorflow.python.ops import control_flow_ops
31from tensorflow.python.ops import linalg_ops
32from tensorflow.python.ops import math_ops
33from tensorflow.python.ops import random_ops
34from tensorflow.python.ops import state_ops
35from tensorflow.python.ops import variable_scope
36from tensorflow.python.ops import variables
37from tensorflow.python.ops.embedding_ops import embedding_lookup
38
39# Machine epsilon.
40MEPS = np.finfo(float).eps
41FULL_COVARIANCE = 'full'
42DIAG_COVARIANCE = 'diag'
43
44
45def _covariance(x, diag):
46  """Defines the covariance operation of a matrix.
47
48  Args:
49    x: a matrix Tensor. Dimension 0 should contain the number of examples.
50    diag: if True, it computes the diagonal covariance.
51
52  Returns:
53    A Tensor representing the covariance of x. In the case of
54  diagonal matrix just the diagonal is returned.
55  """
56  num_points = math_ops.cast(array_ops.shape(x)[0], dtypes.float32)
57  x -= math_ops.reduce_mean(x, 0, keepdims=True)
58  if diag:
59    cov = math_ops.reduce_sum(
60        math_ops.square(x), 0, keepdims=True) / (num_points - 1)
61  else:
62    cov = math_ops.matmul(x, x, transpose_a=True) / (num_points - 1)
63  return cov
64
65
66def _init_clusters_random(data, num_clusters, random_seed):
67  """Does random initialization of clusters.
68
69  Args:
70    data: a list of Tensors with a matrix of data, each row is an example.
71    num_clusters: an integer with the number of clusters.
72    random_seed: Seed for PRNG used to initialize seeds.
73
74  Returns:
75    A Tensor with num_clusters random rows of data.
76  """
77  assert isinstance(data, list)
78  num_data = math_ops.add_n([array_ops.shape(inp)[0] for inp in data])
79  with ops.control_dependencies(
80      [check_ops.assert_less_equal(num_clusters, num_data)]):
81    indices = random_ops.random_uniform(
82        [num_clusters],
83        minval=0,
84        maxval=math_ops.cast(num_data, dtypes.int64),
85        seed=random_seed,
86        dtype=dtypes.int64)
87  indices %= math_ops.cast(num_data, dtypes.int64)
88  clusters_init = embedding_lookup(data, indices, partition_strategy='div')
89  return clusters_init
90
91
92class GmmAlgorithm(object):
93  """Tensorflow Gaussian mixture model clustering class."""
94  CLUSTERS_WEIGHT = 'alphas'
95  CLUSTERS_VARIABLE = 'clusters'
96  CLUSTERS_COVS_VARIABLE = 'clusters_covs'
97
98  def __init__(self,
99               data,
100               num_classes,
101               initial_means=None,
102               params='wmc',
103               covariance_type=FULL_COVARIANCE,
104               random_seed=0):
105    """Constructor.
106
107    Args:
108      data: a list of Tensors with data, each row is a new example.
109      num_classes: number of clusters.
110      initial_means: a Tensor with a matrix of means. If None, means are
111        computed by sampling randomly.
112      params: Controls which parameters are updated in the training
113        process. Can contain any combination of "w" for weights, "m" for
114        means, and "c" for covariances.
115      covariance_type: one of "full", "diag".
116      random_seed: Seed for PRNG used to initialize seeds.
117
118    Raises:
119      Exception if covariance type is unknown.
120    """
121    self._params = params
122    self._random_seed = random_seed
123    self._covariance_type = covariance_type
124    if self._covariance_type not in [DIAG_COVARIANCE, FULL_COVARIANCE]:
125      raise Exception(  # pylint: disable=g-doc-exception
126          'programmer error: Invalid covariance type: %s' %
127          self._covariance_type)
128    # Create sharded variables for multiple shards. The following
129    # lists are indexed by shard.
130    # Probability per example in a class.
131    num_shards = len(data)
132    self._probs = [None] * num_shards
133    # Prior probability.
134    self._prior_probs = [None] * num_shards
135    # Membership weights w_{ik} where "i" is the i-th example and "k"
136    # is the k-th mixture.
137    self._w = [None] * num_shards
138    # Number of examples in a class.
139    self._points_in_k = [None] * num_shards
140    first_shard = data[0]
141    self._dimensions = array_ops.shape(first_shard)[1]
142    self._num_classes = num_classes
143    # Small value to guarantee that covariances are invertible.
144    self._min_var = array_ops.diag(
145        array_ops.ones(array_ops.stack([self._dimensions]))) * 1e-3
146    self._create_variables()
147    self._initialize_variables(data, initial_means)
148    # Operations of partial statistics for the computation of the means.
149    self._w_mul_x = []
150    # Operations of partial statistics for the computation of the covariances.
151    self._w_mul_x2 = []
152    self._define_graph(data)
153
154  def _create_variables(self):
155    """Initializes GMM algorithm."""
156    init_value = array_ops.constant([], dtype=dtypes.float32)
157    self._means = variables.VariableV1(init_value,
158                                       name=self.CLUSTERS_VARIABLE,
159                                       validate_shape=False)
160    self._covs = variables.VariableV1(
161        init_value, name=self.CLUSTERS_COVS_VARIABLE, validate_shape=False)
162    # Mixture weights, representing the probability that a randomly
163    # selected unobservable data (in EM terms) was generated by component k.
164    self._alpha = variable_scope.variable(
165        array_ops.tile([1.0 / self._num_classes], [self._num_classes]),
166        name=self.CLUSTERS_WEIGHT,
167        validate_shape=False)
168    self._cluster_centers_initialized = variables.VariableV1(False,
169                                                             dtype=dtypes.bool,
170                                                             name='initialized')
171
172  def _initialize_variables(self, data, initial_means=None):
173    """Initializes variables.
174
175    Args:
176      data: a list of Tensors with data, each row is a new example.
177      initial_means: a Tensor with a matrix of means.
178    """
179    first_shard = data[0]
180    # Initialize means: num_classes X 1 X dimensions.
181    if initial_means is not None:
182      means = array_ops.expand_dims(initial_means, 1)
183    else:
184      # Sample data randomly
185      means = array_ops.expand_dims(
186          _init_clusters_random(data, self._num_classes, self._random_seed), 1)
187
188    # Initialize covariances.
189    if self._covariance_type == FULL_COVARIANCE:
190      cov = _covariance(first_shard, False) + self._min_var
191      # A matrix per class, num_classes X dimensions X dimensions
192      covs = array_ops.tile(
193          array_ops.expand_dims(cov, 0), [self._num_classes, 1, 1])
194    elif self._covariance_type == DIAG_COVARIANCE:
195      cov = _covariance(first_shard, True) + self._min_var
196      # A diagonal per row, num_classes X dimensions.
197      covs = array_ops.tile(
198          array_ops.expand_dims(array_ops.diag_part(cov), 0),
199          [self._num_classes, 1])
200
201    with ops.colocate_with(self._cluster_centers_initialized):
202      initialized = control_flow_ops.with_dependencies(
203          [means, covs],
204          array_ops.identity(self._cluster_centers_initialized))
205    self._init_ops = []
206    with ops.colocate_with(self._means):
207      init_means = state_ops.assign(self._means, means, validate_shape=False)
208      init_means = control_flow_ops.with_dependencies(
209          [init_means],
210          state_ops.assign(self._cluster_centers_initialized, True))
211      self._init_ops.append(control_flow_ops.cond(initialized,
212                                                  control_flow_ops.no_op,
213                                                  lambda: init_means).op)
214    with ops.colocate_with(self._covs):
215      init_covs = state_ops.assign(self._covs, covs, validate_shape=False)
216      init_covs = control_flow_ops.with_dependencies(
217          [init_covs],
218          state_ops.assign(self._cluster_centers_initialized, True))
219      self._init_ops.append(control_flow_ops.cond(initialized,
220                                                  control_flow_ops.no_op,
221                                                  lambda: init_covs).op)
222
223  def init_ops(self):
224    """Returns the initialization operation."""
225    return control_flow_ops.group(*self._init_ops)
226
227  def training_ops(self):
228    """Returns the training operation."""
229    return control_flow_ops.group(*self._train_ops)
230
231  def is_initialized(self):
232    """Returns a boolean operation for initialized variables."""
233    return self._cluster_centers_initialized
234
235  def alphas(self):
236    return self._alpha
237
238  def clusters(self):
239    """Returns the clusters with dimensions num_classes X 1 X num_dimensions."""
240    return self._means
241
242  def covariances(self):
243    """Returns the covariances matrices."""
244    return self._covs
245
246  def assignments(self):
247    """Returns a list of Tensors with the matrix of assignments per shard."""
248    ret = []
249    for w in self._w:
250      ret.append(math_ops.argmax(w, 1))
251    return ret
252
253  def scores(self):
254    """Returns the per-sample likelihood fo the data.
255
256    Returns:
257      Log probabilities of each data point.
258    """
259    return self._scores
260
261  def log_likelihood_op(self):
262    """Returns the log-likelihood operation."""
263    return self._log_likelihood_op
264
265  def _define_graph(self, data):
266    """Define graph for a single iteration.
267
268    Args:
269      data: a list of Tensors defining the training data.
270    """
271    for shard_id, shard in enumerate(data):
272      self._num_examples = array_ops.shape(shard)[0]
273      shard = array_ops.expand_dims(shard, 0)
274      self._define_log_prob_operation(shard_id, shard)
275      self._define_prior_log_prob_operation(shard_id)
276      self._define_expectation_operation(shard_id)
277      self._define_partial_maximization_operation(shard_id, shard)
278    self._define_maximization_operation(len(data))
279    self._define_loglikelihood_operation()
280    self._define_score_samples()
281
282  def _define_full_covariance_probs(self, shard_id, shard):
283    """Defines the full covariance probabilities per example in a class.
284
285    Updates a matrix with dimension num_examples X num_classes.
286
287    Args:
288      shard_id: id of the current shard.
289      shard: current data shard, 1 X num_examples X dimensions.
290    """
291    diff = shard - self._means
292    cholesky = linalg_ops.cholesky(self._covs + self._min_var)
293    log_det_covs = 2.0 * math_ops.reduce_sum(
294        math_ops.log(array_ops.matrix_diag_part(cholesky)), 1)
295    x_mu_cov = math_ops.square(
296        linalg_ops.matrix_triangular_solve(
297            cholesky, array_ops.transpose(
298                diff, perm=[0, 2, 1]), lower=True))
299    diag_m = array_ops.transpose(math_ops.reduce_sum(x_mu_cov, 1))
300    self._probs[shard_id] = (
301        -0.5 * (diag_m + math_ops.cast(self._dimensions, dtypes.float32) *
302                math_ops.log(2 * np.pi) + log_det_covs))
303
304  def _define_diag_covariance_probs(self, shard_id, shard):
305    """Defines the diagonal covariance probabilities per example in a class.
306
307    Args:
308      shard_id: id of the current shard.
309      shard: current data shard, 1 X num_examples X dimensions.
310
311    Returns a matrix num_examples * num_classes.
312    """
313    # num_classes X 1
314    # TODO(xavigonzalvo): look into alternatives to log for
315    # reparametrization of variance parameters.
316    det_expanded = math_ops.reduce_sum(
317        math_ops.log(self._covs + 1e-3), 1, keepdims=True)
318    x2 = math_ops.squared_difference(shard, self._means)
319    cov_expanded = array_ops.expand_dims(1.0 / (self._covs + 1e-3), 2)
320    # num_classes X num_examples
321    x2_cov = math_ops.matmul(x2, cov_expanded)
322    x2_cov = array_ops.transpose(array_ops.squeeze(x2_cov, [2]))
323    self._probs[shard_id] = -0.5 * (
324        math_ops.cast(self._dimensions, dtypes.float32) *
325        math_ops.log(2.0 * np.pi) +
326        array_ops.transpose(det_expanded) + x2_cov)
327
328  def _define_log_prob_operation(self, shard_id, shard):
329    """Probability per example in a class.
330
331    Updates a matrix with dimension num_examples X num_classes.
332
333    Args:
334      shard_id: id of the current shard.
335      shard: current data shard, 1 X num_examples X dimensions.
336    """
337    # TODO(xavigonzalvo): Use the pdf defined in
338    # third_party/tensorflow/contrib/distributions/python/ops/gaussian.py
339    if self._covariance_type == FULL_COVARIANCE:
340      self._define_full_covariance_probs(shard_id, shard)
341    elif self._covariance_type == DIAG_COVARIANCE:
342      self._define_diag_covariance_probs(shard_id, shard)
343    self._probs[shard_id] += math_ops.log(self._alpha)
344
345  def _define_prior_log_prob_operation(self, shard_id):
346    """Computes the prior probability of all samples.
347
348    Updates a vector where each item is the prior probability of an
349    input example.
350
351    Args:
352      shard_id: id of current shard_id.
353    """
354    self._prior_probs[shard_id] = math_ops.reduce_logsumexp(
355        self._probs[shard_id], axis=1, keepdims=True)
356
357  def _define_expectation_operation(self, shard_id):
358    # Shape broadcasting.
359    probs = array_ops.expand_dims(self._probs[shard_id], 0)
360    # Membership weights are computed as:
361    # $$w_{ik} = \frac{\alpha_k f(\mathbf{y_i}|\mathbf{\theta}_k)}$$
362    # $$            {\sum_{m=1}^{K}\alpha_mf(\mathbf{y_i}|\mathbf{\theta}_m)}$$
363    # where "i" is the i-th example, "k" is the k-th mixture, theta are
364    # the model parameters and y_i the observations.
365    # These are defined for each shard.
366    self._w[shard_id] = array_ops.reshape(
367        math_ops.exp(probs - self._prior_probs[shard_id]),
368        array_ops.stack([self._num_examples, self._num_classes]))
369
370  def _define_partial_maximization_operation(self, shard_id, shard):
371    """Computes the partial statistics of the means and covariances.
372
373    Args:
374      shard_id: current shard id.
375      shard: current data shard, 1 X num_examples X dimensions.
376    """
377    # Soft assignment of each data point to each of the two clusters.
378    self._points_in_k[shard_id] = math_ops.reduce_sum(
379        self._w[shard_id], 0, keepdims=True)
380    # Partial means.
381    w_mul_x = array_ops.expand_dims(
382        math_ops.matmul(
383            self._w[shard_id], array_ops.squeeze(shard, [0]), transpose_a=True),
384        1)
385    self._w_mul_x.append(w_mul_x)
386    # Partial covariances.
387    x = array_ops.concat([shard for _ in range(self._num_classes)], 0)
388    x_trans = array_ops.transpose(x, perm=[0, 2, 1])
389    x_mul_w = array_ops.concat([
390        array_ops.expand_dims(x_trans[k, :, :] * self._w[shard_id][:, k], 0)
391        for k in range(self._num_classes)
392    ], 0)
393    self._w_mul_x2.append(math_ops.matmul(x_mul_w, x))
394
395  def _define_maximization_operation(self, num_batches):
396    """Maximization operations."""
397    # TODO(xavigonzalvo): some of these operations could be moved to C++.
398    # Compute the effective number of data points assigned to component k.
399    with ops.control_dependencies(self._w):
400      points_in_k = array_ops.squeeze(
401          math_ops.add_n(self._points_in_k), axis=[0])
402      # Update alpha.
403      if 'w' in self._params:
404        final_points_in_k = points_in_k / num_batches
405        num_examples = math_ops.cast(math_ops.reduce_sum(final_points_in_k),
406                                     dtypes.float32)
407        self._alpha_op = self._alpha.assign(final_points_in_k /
408                                            (num_examples + MEPS))
409      else:
410        self._alpha_op = control_flow_ops.no_op()
411      self._train_ops = [self._alpha_op]
412
413      # Update means.
414      points_in_k_expanded = array_ops.reshape(points_in_k,
415                                               [self._num_classes, 1, 1])
416      if 'm' in self._params:
417        self._means_op = self._means.assign(
418            math_ops.div(
419                math_ops.add_n(self._w_mul_x), points_in_k_expanded + MEPS))
420      else:
421        self._means_op = control_flow_ops.no_op()
422      # means are (num_classes x 1 x dims)
423
424      # Update covariances.
425      with ops.control_dependencies([self._means_op]):
426        b = math_ops.add_n(self._w_mul_x2) / (points_in_k_expanded + MEPS)
427        new_covs = []
428        for k in range(self._num_classes):
429          mean = self._means.value()[k, :, :]
430          square_mean = math_ops.matmul(mean, mean, transpose_a=True)
431          new_cov = b[k, :, :] - square_mean + self._min_var
432          if self._covariance_type == FULL_COVARIANCE:
433            new_covs.append(array_ops.expand_dims(new_cov, 0))
434          elif self._covariance_type == DIAG_COVARIANCE:
435            new_covs.append(
436                array_ops.expand_dims(array_ops.diag_part(new_cov), 0))
437        new_covs = array_ops.concat(new_covs, 0)
438        if 'c' in self._params:
439          # Train operations don't need to take care of the means
440          # because covariances already depend on it.
441          with ops.control_dependencies([self._means_op, new_covs]):
442            self._train_ops.append(
443                state_ops.assign(
444                    self._covs, new_covs, validate_shape=False))
445
446  def _define_loglikelihood_operation(self):
447    """Defines the total log-likelihood of current iteration."""
448    op = []
449    for prior_probs in self._prior_probs:
450      op.append(math_ops.reduce_logsumexp(prior_probs))
451    self._log_likelihood_op = math_ops.reduce_logsumexp(op)
452
453  def _define_score_samples(self):
454    """Defines the likelihood of each data sample."""
455    op = []
456    for shard_id, prior_probs in enumerate(self._prior_probs):
457      op.append(prior_probs + math_ops.log(self._w[shard_id]))
458    self._scores = array_ops.squeeze(
459        math_ops.reduce_logsumexp(op, axis=2, keepdims=True), axis=0)
460
461
462def gmm(inp,
463        initial_clusters,
464        num_clusters,
465        random_seed,
466        covariance_type=FULL_COVARIANCE,
467        params='wmc'):
468  """Creates the graph for Gaussian mixture model (GMM) clustering.
469
470  Args:
471    inp: An input tensor or list of input tensors
472    initial_clusters: Specifies the clusters used during
473      initialization. Can be a tensor or numpy array, or a function
474      that generates the clusters. Can also be "random" to specify
475      that clusters should be chosen randomly from input data. Note: type
476      is diverse to be consistent with skflow.
477    num_clusters: number of clusters.
478    random_seed: Python integer. Seed for PRNG used to initialize centers.
479    covariance_type: one of "diag", "full".
480    params: Controls which parameters are updated in the training
481      process. Can contain any combination of "w" for weights, "m" for
482      means, and "c" for covars.
483
484  Returns:
485    Note: tuple of lists returned to be consistent with skflow
486    A tuple consisting of:
487    assignments: A vector (or list of vectors). Each element in the vector
488      corresponds to an input row in 'inp' and specifies the cluster id
489      corresponding to the input.
490    training_op: an op that runs an iteration of training.
491    init_op: an op that runs the initialization.
492  """
493  initial_means = None
494  if initial_clusters != 'random' and not isinstance(initial_clusters,
495                                                     ops.Tensor):
496    initial_means = constant_op.constant(initial_clusters, dtype=dtypes.float32)
497
498  # Implementation of GMM.
499  inp = inp if isinstance(inp, list) else [inp]
500  gmm_tool = GmmAlgorithm(inp, num_clusters, initial_means, params,
501                          covariance_type, random_seed)
502  assignments = gmm_tool.assignments()
503  scores = gmm_tool.scores()
504  loss = gmm_tool.log_likelihood_op()
505  return (loss, scores, [assignments], gmm_tool.training_ops(),
506          gmm_tool.init_ops(), gmm_tool.is_initialized())
507