1# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15# pylint: disable=line-too-long
16"""Proximal stochastic dual coordinate ascent optimizer for linear models (deprecated).
17
18This module and all its submodules are deprecated. To UPDATE or USE linear
19optimizers, please check its latest version in core:
20tensorflow_estimator/python/estimator/canned/linear_optimizer/.
21"""
22# pylint: enable=line-too-long
23from __future__ import absolute_import
24from __future__ import division
25from __future__ import print_function
26
27import collections
28
29from six.moves import range
30
31from tensorflow.contrib.linear_optimizer.python.ops.sharded_mutable_dense_hashtable import ShardedMutableDenseHashTable
32from tensorflow.python.compat import compat
33from tensorflow.python.framework import constant_op
34from tensorflow.python.framework import dtypes
35from tensorflow.python.framework import ops
36from tensorflow.python.framework import tensor_shape
37from tensorflow.python.framework.ops import internal_convert_to_tensor
38from tensorflow.python.framework.ops import name_scope
39from tensorflow.python.ops import array_ops
40from tensorflow.python.ops import control_flow_ops
41from tensorflow.python.ops import data_flow_ops
42from tensorflow.python.ops import gen_sdca_ops
43from tensorflow.python.ops import math_ops
44from tensorflow.python.ops import nn_ops
45from tensorflow.python.ops import state_ops
46from tensorflow.python.ops import variables as var_ops
47from tensorflow.python.ops.nn import log_poisson_loss
48from tensorflow.python.ops.nn import sigmoid_cross_entropy_with_logits
49from tensorflow.python.summary import summary
50from tensorflow.python.util import deprecation
51
52__all__ = ['SdcaModel']
53
54
55# TODO(sibyl-Aix6ihai): add name_scope to appropriate methods.
56class SdcaModel(object):
57  """Stochastic dual coordinate ascent solver for linear models.
58
59  Loss functions supported:
60
61     * Binary logistic loss
62     * Squared loss
63     * Hinge loss
64     * Smooth hinge loss
65     * Poisson log loss
66
67    This class defines an optimizer API to train a linear model.
68
69    ### Usage
70
71    ```python
72    # Create a solver with the desired parameters.
73    lr = tf.contrib.linear_optimizer.SdcaModel(examples, variables, options)
74    min_op = lr.minimize()
75    opt_op = lr.update_weights(min_op)
76
77    predictions = lr.predictions(examples)
78    # Primal loss + L1 loss + L2 loss.
79    regularized_loss = lr.regularized_loss(examples)
80    # Primal loss only
81    unregularized_loss = lr.unregularized_loss(examples)
82
83    examples: {
84      sparse_features: list of SparseFeatureColumn.
85      dense_features: list of dense tensors of type float32.
86      example_labels: a tensor of type float32 and shape [Num examples]
87      example_weights: a tensor of type float32 and shape [Num examples]
88      example_ids: a tensor of type string and shape [Num examples]
89    }
90    variables: {
91      sparse_features_weights: list of tensors of shape [vocab size]
92      dense_features_weights: list of tensors of shape [dense_feature_dimension]
93    }
94    options: {
95      symmetric_l1_regularization: 0.0
96      symmetric_l2_regularization: 1.0
97      loss_type: "logistic_loss"
98      num_loss_partitions: 1 (Optional, with default value of 1. Number of
99      partitions of the global loss function, 1 means single machine solver,
100      and >1 when we have more than one optimizer working concurrently.)
101      num_table_shards: 1 (Optional, with default value of 1. Number of shards
102      of the internal state table, typically set to match the number of
103      parameter servers for large data sets.
104    }
105    ```
106
107    In the training program you will just have to run the returned Op from
108    minimize().
109
110    ```python
111    # Execute opt_op and train for num_steps.
112    for _ in range(num_steps):
113      opt_op.run()
114
115    # You can also check for convergence by calling
116    lr.approximate_duality_gap()
117    ```
118  """
119
120  @deprecation.deprecated(
121      None, 'This class is deprecated. To UPDATE or USE linear optimizers, '
122      'please check its latest version in core: '
123      'tensorflow_estimator/python/estimator/canned/linear_optimizer/.')
124  def __init__(self, examples, variables, options):
125    """Create a new sdca optimizer."""
126
127    if not examples or not variables or not options:
128      raise ValueError('examples, variables and options must all be specified.')
129
130    supported_losses = ('logistic_loss', 'squared_loss', 'hinge_loss',
131                        'smooth_hinge_loss', 'poisson_loss')
132    if options['loss_type'] not in supported_losses:
133      raise ValueError('Unsupported loss_type: ', options['loss_type'])
134
135    self._assertSpecified([
136        'example_labels', 'example_weights', 'example_ids', 'sparse_features',
137        'dense_features'
138    ], examples)
139    self._assertList(['sparse_features', 'dense_features'], examples)
140
141    self._assertSpecified(['sparse_features_weights', 'dense_features_weights'],
142                          variables)
143    self._assertList(['sparse_features_weights', 'dense_features_weights'],
144                     variables)
145
146    self._assertSpecified([
147        'loss_type', 'symmetric_l2_regularization',
148        'symmetric_l1_regularization'
149    ], options)
150
151    for name in ['symmetric_l1_regularization', 'symmetric_l2_regularization']:
152      value = options[name]
153      if value < 0.0:
154        raise ValueError('%s should be non-negative. Found (%f)' %
155                         (name, value))
156
157    self._examples = examples
158    self._variables = variables
159    self._options = options
160    self._create_slots()
161    self._hashtable = ShardedMutableDenseHashTable(
162        key_dtype=dtypes.int64,
163        value_dtype=dtypes.float32,
164        num_shards=self._num_table_shards(),
165        default_value=[0.0, 0.0, 0.0, 0.0],
166        # SdcaFprint never returns 0 or 1 for the low64 bits, so this a safe
167        # empty_key (that will never collide with actual payloads).
168        empty_key=[0, 0],
169        deleted_key=[1, 1])
170
171    summary.scalar('approximate_duality_gap', self.approximate_duality_gap())
172    summary.scalar('examples_seen', self._hashtable.size())
173
174  def _symmetric_l1_regularization(self):
175    return self._options['symmetric_l1_regularization']
176
177  def _symmetric_l2_regularization(self):
178    # Algorithmic requirement (for now) is to have minimal l2 of 1.0.
179    return max(self._options['symmetric_l2_regularization'], 1.0)
180
181  def _num_loss_partitions(self):
182    # Number of partitions of the global objective.
183    # TODO(andreasst): set num_loss_partitions automatically based on the number
184    # of workers
185    return self._options.get('num_loss_partitions', 1)
186
187  def _adaptive(self):
188    # Perform adaptive sampling.
189    return self._options.get('adaptive', True)
190
191  def _num_table_shards(self):
192    # Number of hash table shards.
193    # Return 1 if not specified or if the value is 'None'
194    # TODO(andreasst): set num_table_shards automatically based on the number
195    # of parameter servers
196    num_shards = self._options.get('num_table_shards')
197    return 1 if num_shards is None else num_shards
198
199  # TODO(sibyl-Aix6ihai): Use optimizer interface to make use of slot creation logic.
200  def _create_slots(self):
201    """Make unshrinked internal variables (slots)."""
202    # Unshrinked variables have the updates before applying L1 regularization.
203    # Each unshrinked slot variable is either a `Variable` or list of
204    # `Variable`, depending on the value of its corresponding primary variable.
205    # We avoid using `PartitionedVariable` for the unshrinked slots since we do
206    # not need any of the extra information.
207    self._slots = collections.defaultdict(list)
208    for name in ['sparse_features_weights', 'dense_features_weights']:
209      for var in self._variables[name]:
210        # Our primary variable may be either a PartitionedVariable, or a list
211        # of Variables (each representing a partition).
212        if (isinstance(var, var_ops.PartitionedVariable) or
213            isinstance(var, list)):
214          var_list = []
215          # pylint: disable=protected-access
216          for v in var:
217            with ops.colocate_with(v):
218              # TODO(andreasst): remove SDCAOptimizer suffix once bug 30843109
219              # is fixed.
220              slot_var = var_ops.VariableV1(
221                  initial_value=array_ops.zeros_like(v.initialized_value(),
222                                                     dtypes.float32),
223                  name=v.op.name + '_unshrinked/SDCAOptimizer')
224              var_list.append(slot_var)
225          self._slots['unshrinked_' + name].append(var_list)
226          # pylint: enable=protected-access
227        else:
228          with ops.device(var.device):
229            # TODO(andreasst): remove SDCAOptimizer suffix once bug 30843109 is
230            # fixed.
231            self._slots['unshrinked_' + name].append(
232                var_ops.VariableV1(
233                    array_ops.zeros_like(var.initialized_value(),
234                                         dtypes.float32),
235                    name=var.op.name + '_unshrinked/SDCAOptimizer'))
236
237  def _assertSpecified(self, items, check_in):
238    for x in items:
239      if check_in[x] is None:
240        raise ValueError(check_in[x] + ' must be specified.')
241
242  def _assertList(self, items, check_in):
243    for x in items:
244      if not isinstance(check_in[x], list):
245        raise ValueError(x + ' must be a list.')
246
247  def _var_to_list(self, var):
248    """Wraps var in a list if it is not a list or PartitionedVariable."""
249    if not (isinstance(var, list) or
250            isinstance(var, var_ops.PartitionedVariable)):
251      var = [var]
252    return var
253
254  def _l1_loss(self):
255    """Computes the (un-normalized) l1 loss of the model."""
256    with name_scope('sdca/l1_loss'):
257      sums = []
258      for name in ['sparse_features_weights', 'dense_features_weights']:
259        for var in self._variables[name]:
260          for v in self._var_to_list(var):
261            weights = internal_convert_to_tensor(v)
262            with ops.device(weights.device):
263              sums.append(
264                  math_ops.reduce_sum(
265                      math_ops.abs(math_ops.cast(weights, dtypes.float64))))
266      # SDCA L1 regularization cost is: l1 * sum(|weights|)
267      return self._options['symmetric_l1_regularization'] * math_ops.add_n(sums)
268
269  def _l2_loss(self, l2):
270    """Computes the (un-normalized) l2 loss of the model."""
271    with name_scope('sdca/l2_loss'):
272      sums = []
273      for name in ['sparse_features_weights', 'dense_features_weights']:
274        for var in self._variables[name]:
275          for v in self._var_to_list(var):
276            weights = internal_convert_to_tensor(v)
277            with ops.device(weights.device):
278              sums.append(math_ops.reduce_sum(math_ops.square(math_ops.cast(
279                  weights, dtypes.float64))))
280      # SDCA L2 regularization cost is: l2 * sum(weights^2) / 2
281      return l2 * math_ops.add_n(sums) / 2.0
282
283  def _convert_n_to_tensor(self, input_list, as_ref=False):
284    """Converts input list to a set of tensors."""
285    # input_list can be a list of Variables (that are implicitly partitioned),
286    # in which case the underlying logic in internal_convert_to_tensor will not
287    # concatenate the partitions together.  This method takes care of the
288    # concatenating (we only allow partitioning on the first axis).
289    output_list = []
290    for x in input_list:
291      tensor_to_convert = x
292      if isinstance(x, list) or isinstance(x, var_ops.PartitionedVariable):
293        # We only allow for partitioning on the first axis.
294        tensor_to_convert = array_ops.concat(x, axis=0)
295      output_list.append(internal_convert_to_tensor(
296          tensor_to_convert, as_ref=as_ref))
297    return output_list
298
299  def _get_first_dimension_size_statically(self, w, num_partitions):
300    """Compute the static size of the first dimension for a sharded variable."""
301    dim_0_size = w[0].get_shape()[0]
302    for p in range(1, num_partitions):
303      dim_0_size += w[p].get_shape()[0]
304    return dim_0_size
305
306  def _linear_predictions(self, examples):
307    """Returns predictions of the form w*x."""
308    with name_scope('sdca/prediction'):
309      sparse_variables = self._convert_n_to_tensor(self._variables[
310          'sparse_features_weights'])
311      result_sparse = 0.0
312      for sfc, sv in zip(examples['sparse_features'], sparse_variables):
313        # TODO(sibyl-Aix6ihai): following does not take care of missing features.
314        result_sparse += math_ops.segment_sum(
315            math_ops.multiply(
316                array_ops.gather(sv, sfc.feature_indices), sfc.feature_values),
317            sfc.example_indices)
318      dense_features = self._convert_n_to_tensor(examples['dense_features'])
319      dense_variables = self._convert_n_to_tensor(self._variables[
320          'dense_features_weights'])
321
322      result_dense = 0.0
323      for i in range(len(dense_variables)):
324        result_dense += math_ops.matmul(dense_features[i],
325                                        array_ops.expand_dims(
326                                            dense_variables[i], -1))
327
328    # Reshaping to allow shape inference at graph construction time.
329    return array_ops.reshape(result_dense, [-1]) + result_sparse
330
331  def predictions(self, examples):
332    """Add operations to compute predictions by the model.
333
334    If logistic_loss is being used, predicted probabilities are returned.
335    If poisson_loss is being used, predictions are exponentiated.
336    Otherwise, (raw) linear predictions (w*x) are returned.
337
338    Args:
339      examples: Examples to compute predictions on.
340
341    Returns:
342      An Operation that computes the predictions for examples.
343
344    Raises:
345      ValueError: if examples are not well defined.
346    """
347    self._assertSpecified(
348        ['example_weights', 'sparse_features', 'dense_features'], examples)
349    self._assertList(['sparse_features', 'dense_features'], examples)
350
351    result = self._linear_predictions(examples)
352    if self._options['loss_type'] == 'logistic_loss':
353      # Convert logits to probability for logistic loss predictions.
354      with name_scope('sdca/logistic_prediction'):
355        result = math_ops.sigmoid(result)
356    elif self._options['loss_type'] == 'poisson_loss':
357      # Exponeniate the prediction for poisson loss predictions.
358      with name_scope('sdca/poisson_prediction'):
359        result = math_ops.exp(result)
360    return result
361
362  def _get_partitioned_update_ops(self,
363                                  v_num,
364                                  num_partitions_by_var,
365                                  p_assignments_by_var,
366                                  gather_ids_by_var,
367                                  weights,
368                                  full_update,
369                                  p_assignments,
370                                  num_partitions):
371    """Get updates for partitioned variables."""
372    num_partitions = num_partitions_by_var[v_num]
373    p_assignments = p_assignments_by_var[v_num]
374    gather_ids = gather_ids_by_var[v_num]
375    updates = data_flow_ops.dynamic_partition(
376        full_update, p_assignments, num_partitions)
377    update_ops = []
378    for p in range(num_partitions):
379      with ops.colocate_with(weights[p]):
380        result = state_ops.scatter_add(weights[p], gather_ids[p], updates[p])
381      update_ops.append(result)
382    return update_ops
383
384  def minimize(self, global_step=None, name=None):
385    """Add operations to train a linear model by minimizing the loss function.
386
387    Args:
388      global_step: Optional `Variable` to increment by one after the
389        variables have been updated.
390      name: Optional name for the returned operation.
391
392    Returns:
393      An Operation that updates the variables passed in the constructor.
394    """
395    # Technically, the op depends on a lot more than the variables,
396    # but we'll keep the list short.
397    with name_scope(name, 'sdca/minimize'):
398      sparse_example_indices = []
399      sparse_feature_indices = []
400      sparse_features_values = []
401      for sf in self._examples['sparse_features']:
402        sparse_example_indices.append(sf.example_indices)
403        sparse_feature_indices.append(sf.feature_indices)
404        # If feature values are missing, sdca assumes a value of 1.0f.
405        if sf.feature_values is not None:
406          sparse_features_values.append(sf.feature_values)
407
408      # pylint: disable=protected-access
409      example_ids_hashed = gen_sdca_ops.sdca_fprint(
410          internal_convert_to_tensor(self._examples['example_ids']))
411      # pylint: enable=protected-access
412      example_state_data = self._hashtable.lookup(example_ids_hashed)
413      # Solver returns example_state_update, new delta sparse_feature_weights
414      # and delta dense_feature_weights.
415
416      sparse_weights = []
417      sparse_indices = []
418      # If we have partitioned variables, keep a few dictionaries of Tensors
419      # around that we need for the assign_add after the op call to
420      # gen_sdca_ops.sdca_optimizer().  These are keyed because we may have a
421      # mix of partitioned and un-partitioned variables.
422      num_partitions_by_var = {}
423      p_assignments_by_var = {}
424      gather_ids_by_var = {}
425      for v_num, (w, i) in enumerate(
426          zip(self._slots['unshrinked_sparse_features_weights'],
427              sparse_feature_indices)):
428        # Append the sparse_indices (in full-variable space).
429        sparse_idx = math_ops.cast(
430            array_ops.unique(math_ops.cast(i, dtypes.int32))[0],
431            dtypes.int64)
432        sparse_indices.append(sparse_idx)
433        if isinstance(w, list) or isinstance(w, var_ops.PartitionedVariable):
434          num_partitions = len(w)
435          flat_ids = array_ops.reshape(sparse_idx, [-1])
436          # We use div partitioning, which is easiest to support downstream.
437          # Compute num_total_ids as the sum of dim-0 of w, then assign
438          # to partitions based on a constant number of ids per partition.
439          # Optimize if we already know the full shape statically.
440          dim_0_size = self._get_first_dimension_size_statically(
441              w, num_partitions)
442
443          if tensor_shape.dimension_value(dim_0_size):
444            num_total_ids = constant_op.constant(
445                tensor_shape.dimension_value(dim_0_size),
446                flat_ids.dtype)
447          else:
448            dim_0_sizes = []
449            for p in range(num_partitions):
450              if tensor_shape.dimension_value(w[p].shape[0]) is not None:
451                dim_0_sizes.append(tensor_shape.dimension_value(w[p].shape[0]))
452              else:
453                with ops.colocate_with(w[p]):
454                  dim_0_sizes.append(array_ops.shape(w[p])[0])
455            num_total_ids = math_ops.reduce_sum(
456                math_ops.cast(array_ops.stack(dim_0_sizes), flat_ids.dtype))
457          ids_per_partition = num_total_ids // num_partitions
458          extras = num_total_ids % num_partitions
459
460          p_assignments = math_ops.maximum(
461              flat_ids // (ids_per_partition + 1),
462              (flat_ids - extras) // ids_per_partition)
463
464          # Emulate a conditional using a boolean indicator tensor
465          new_ids = array_ops.where(p_assignments < extras,
466                                    flat_ids % (ids_per_partition + 1),
467                                    (flat_ids - extras) % ids_per_partition)
468
469          # Cast partition assignments to int32 for use in dynamic_partition.
470          # There really should not be more than 2^32 partitions.
471          p_assignments = math_ops.cast(p_assignments, dtypes.int32)
472          # Partition list of ids based on assignments into num_partitions
473          # separate lists.
474          gather_ids = data_flow_ops.dynamic_partition(new_ids,
475                                                       p_assignments,
476                                                       num_partitions)
477          # Add these into the dictionaries for use in the later update.
478          num_partitions_by_var[v_num] = num_partitions
479          p_assignments_by_var[v_num] = p_assignments
480          gather_ids_by_var[v_num] = gather_ids
481
482          # Gather the weights from each partition.
483          partition_gathered_weights = []
484          for p in range(num_partitions):
485            with ops.colocate_with(w[p]):
486              partition_gathered_weights.append(
487                  array_ops.gather(w[p], gather_ids[p]))
488
489          # Stitch the weights back together in the same order they were before
490          # we dynamic_partitioned them.
491          condition_indices = data_flow_ops.dynamic_partition(
492              math_ops.range(array_ops.shape(new_ids)[0]),
493              p_assignments, num_partitions)
494          batch_gathered_weights = data_flow_ops.dynamic_stitch(
495              condition_indices, partition_gathered_weights)
496        else:
497          w_as_tensor = internal_convert_to_tensor(w)
498          with ops.device(w_as_tensor.device):
499            batch_gathered_weights = array_ops.gather(
500                w_as_tensor, sparse_idx)
501        sparse_weights.append(batch_gathered_weights)
502
503      # pylint: disable=protected-access
504      if compat.forward_compatible(year=2018, month=10, day=30):
505        esu, sfw, dfw = gen_sdca_ops.sdca_optimizer_v2(
506            sparse_example_indices,
507            sparse_feature_indices,
508            sparse_features_values,
509            self._convert_n_to_tensor(self._examples['dense_features']),
510            internal_convert_to_tensor(self._examples['example_weights']),
511            internal_convert_to_tensor(self._examples['example_labels']),
512            sparse_indices,
513            sparse_weights,
514            self._convert_n_to_tensor(self._slots[
515                'unshrinked_dense_features_weights']),
516            example_state_data,
517            loss_type=self._options['loss_type'],
518            l1=self._options['symmetric_l1_regularization'],
519            l2=self._symmetric_l2_regularization(),
520            num_loss_partitions=self._num_loss_partitions(),
521            num_inner_iterations=1,
522            adaptive=self._adaptive())
523      else:
524        esu, sfw, dfw = gen_sdca_ops.sdca_optimizer(
525            sparse_example_indices,
526            sparse_feature_indices,
527            sparse_features_values,
528            self._convert_n_to_tensor(self._examples['dense_features']),
529            internal_convert_to_tensor(self._examples['example_weights']),
530            internal_convert_to_tensor(self._examples['example_labels']),
531            sparse_indices,
532            sparse_weights,
533            self._convert_n_to_tensor(self._slots[
534                'unshrinked_dense_features_weights']),
535            example_state_data,
536            loss_type=self._options['loss_type'],
537            l1=self._options['symmetric_l1_regularization'],
538            l2=self._symmetric_l2_regularization(),
539            num_loss_partitions=self._num_loss_partitions(),
540            num_inner_iterations=1,
541            adaptative=self._adaptive())
542      # pylint: enable=protected-access
543
544      with ops.control_dependencies([esu]):
545        update_ops = [self._hashtable.insert(example_ids_hashed, esu)]
546        # Update the weights before the proximal step.
547        for v_num, (w, i, u) in enumerate(
548            zip(self._slots['unshrinked_sparse_features_weights'],
549                sparse_indices, sfw)):
550          if (isinstance(w, var_ops.PartitionedVariable) or
551              isinstance(w, list)):
552            update_ops += self._get_partitioned_update_ops(
553                v_num, num_partitions_by_var, p_assignments_by_var,
554                gather_ids_by_var, w, u, p_assignments, num_partitions)
555          else:
556            update_ops.append(state_ops.scatter_add(w, i, u))
557        for w, u in zip(self._slots['unshrinked_dense_features_weights'], dfw):
558          if (isinstance(w, var_ops.PartitionedVariable) or
559              isinstance(w, list)):
560            split_updates = array_ops.split(
561                u, num_or_size_splits=[v.shape.as_list()[0] for v in w])
562            for v, split_update in zip(w, split_updates):
563              update_ops.append(state_ops.assign_add(v, split_update))
564          else:
565            update_ops.append(state_ops.assign_add(w, u))
566      if not global_step:
567        return control_flow_ops.group(*update_ops)
568      with ops.control_dependencies(update_ops):
569        return state_ops.assign_add(global_step, 1, name=name).op
570
571  def update_weights(self, train_op):
572    """Updates the model weights.
573
574    This function must be called on at least one worker after `minimize`.
575    In distributed training this call can be omitted on non-chief workers to
576    speed up training.
577
578    Args:
579      train_op: The operation returned by the `minimize` call.
580
581    Returns:
582      An Operation that updates the model weights.
583    """
584    with ops.control_dependencies([train_op]):
585      update_ops = []
586      # Copy over unshrinked weights to user provided variables.
587      for name in ['sparse_features_weights', 'dense_features_weights']:
588        for var, slot_var in zip(self._variables[name],
589                                 self._slots['unshrinked_' + name]):
590          for v, sv in zip(self._var_to_list(var), self._var_to_list(slot_var)):
591            update_ops.append(v.assign(sv))
592
593    # Apply proximal step.
594    with ops.control_dependencies(update_ops):
595      update_ops = []
596      for name in ['sparse_features_weights', 'dense_features_weights']:
597        for var in self._variables[name]:
598          for v in self._var_to_list(var):
599            with ops.device(v.device):
600              # pylint: disable=protected-access
601              update_ops.append(
602                  gen_sdca_ops.sdca_shrink_l1(
603                      self._convert_n_to_tensor([v], as_ref=True),
604                      l1=self._symmetric_l1_regularization(),
605                      l2=self._symmetric_l2_regularization()))
606      return control_flow_ops.group(*update_ops)
607
608  def approximate_duality_gap(self):
609    """Add operations to compute the approximate duality gap.
610
611    Returns:
612      An Operation that computes the approximate duality gap over all
613      examples.
614    """
615    with name_scope('sdca/approximate_duality_gap'):
616      _, values_list = self._hashtable.export_sharded()
617      shard_sums = []
618      for values in values_list:
619        with ops.device(values.device):
620          # For large tables to_double() below allocates a large temporary
621          # tensor that is freed once the sum operation completes. To reduce
622          # peak memory usage in cases where we have multiple large tables on a
623          # single device, we serialize these operations.
624          # Note that we need double precision to get accurate results.
625          with ops.control_dependencies(shard_sums):
626            shard_sums.append(
627                math_ops.reduce_sum(math_ops.cast(values, dtypes.float64), 0))
628      summed_values = math_ops.add_n(shard_sums)
629
630      primal_loss = summed_values[1]
631      dual_loss = summed_values[2]
632      example_weights = summed_values[3]
633      # Note: we return NaN if there are no weights or all weights are 0, e.g.
634      # if no examples have been processed
635      return (primal_loss + dual_loss + self._l1_loss() +
636              (2.0 * self._l2_loss(self._symmetric_l2_regularization()))
637             ) / example_weights
638
639  def unregularized_loss(self, examples):
640    """Add operations to compute the loss (without the regularization loss).
641
642    Args:
643      examples: Examples to compute unregularized loss on.
644
645    Returns:
646      An Operation that computes mean (unregularized) loss for given set of
647      examples.
648
649    Raises:
650      ValueError: if examples are not well defined.
651    """
652    self._assertSpecified([
653        'example_labels', 'example_weights', 'sparse_features', 'dense_features'
654    ], examples)
655    self._assertList(['sparse_features', 'dense_features'], examples)
656    with name_scope('sdca/unregularized_loss'):
657      predictions = math_ops.cast(
658          self._linear_predictions(examples), dtypes.float64)
659      labels = math_ops.cast(
660          internal_convert_to_tensor(examples['example_labels']),
661          dtypes.float64)
662      weights = math_ops.cast(
663          internal_convert_to_tensor(examples['example_weights']),
664          dtypes.float64)
665
666      if self._options['loss_type'] == 'logistic_loss':
667        return math_ops.reduce_sum(math_ops.multiply(
668            sigmoid_cross_entropy_with_logits(labels=labels,
669                                              logits=predictions),
670            weights)) / math_ops.reduce_sum(weights)
671
672      if self._options['loss_type'] == 'poisson_loss':
673        return math_ops.reduce_sum(math_ops.multiply(
674            log_poisson_loss(targets=labels, log_input=predictions),
675            weights)) / math_ops.reduce_sum(weights)
676
677      if self._options['loss_type'] in ['hinge_loss', 'smooth_hinge_loss']:
678        # hinge_loss = max{0, 1 - y_i w*x} where y_i \in {-1, 1}. So, we need to
679        # first convert 0/1 labels into -1/1 labels.
680        all_ones = array_ops.ones_like(predictions)
681        adjusted_labels = math_ops.subtract(2 * labels, all_ones)
682        # Tensor that contains (unweighted) error (hinge loss) per
683        # example.
684        error = nn_ops.relu(
685            math_ops.subtract(all_ones,
686                              math_ops.multiply(adjusted_labels, predictions)))
687        weighted_error = math_ops.multiply(error, weights)
688        return math_ops.reduce_sum(weighted_error) / math_ops.reduce_sum(
689            weights)
690
691      # squared loss
692      err = math_ops.subtract(labels, predictions)
693
694      weighted_squared_err = math_ops.multiply(math_ops.square(err), weights)
695      # SDCA squared loss function is sum(err^2) / (2*sum(weights))
696      return (math_ops.reduce_sum(weighted_squared_err) /
697              (2.0 * math_ops.reduce_sum(weights)))
698
699  def regularized_loss(self, examples):
700    """Add operations to compute the loss with regularization loss included.
701
702    Args:
703      examples: Examples to compute loss on.
704
705    Returns:
706      An Operation that computes mean (regularized) loss for given set of
707      examples.
708    Raises:
709      ValueError: if examples are not well defined.
710    """
711    self._assertSpecified([
712        'example_labels', 'example_weights', 'sparse_features', 'dense_features'
713    ], examples)
714    self._assertList(['sparse_features', 'dense_features'], examples)
715    with name_scope('sdca/regularized_loss'):
716      weights = internal_convert_to_tensor(examples['example_weights'])
717      return ((
718          self._l1_loss() +
719          # Note that here we are using the raw regularization
720          # (as specified by the user) and *not*
721          # self._symmetric_l2_regularization().
722          self._l2_loss(self._options['symmetric_l2_regularization'])) /
723              math_ops.reduce_sum(math_ops.cast(weights, dtypes.float64)) +
724              self.unregularized_loss(examples))
725