1"""Linear Estimators."""
2#  Copyright 2015 The TensorFlow Authors. All Rights Reserved.
3#
4#  Licensed under the Apache License, Version 2.0 (the "License");
5#  you may not use this file except in compliance with the License.
6#  You may obtain a copy of the License at
7#
8#   http://www.apache.org/licenses/LICENSE-2.0
9#
10#  Unless required by applicable law or agreed to in writing, software
11#  distributed under the License is distributed on an "AS IS" BASIS,
12#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13#  See the License for the specific language governing permissions and
14#  limitations under the License.
15from __future__ import absolute_import
16from __future__ import division
17from __future__ import print_function
18
19from tensorflow.contrib import layers
20from tensorflow.contrib.linear_optimizer.python.ops import sdca_ops
21from tensorflow.contrib.linear_optimizer.python.ops.sparse_feature_column import SparseFeatureColumn
22from tensorflow.python.framework import dtypes
23from tensorflow.python.framework import ops
24from tensorflow.python.ops import array_ops
25from tensorflow.python.ops import control_flow_ops
26from tensorflow.python.ops import math_ops
27
28
29# TODO(sibyl-vie3Poto, sibyl-Aix6ihai): Add proper testing to this wrapper once the API is
30# stable.
31class SDCAOptimizer(object):
32  """Wrapper class for SDCA optimizer.
33
34  The wrapper is currently meant for use as an optimizer within a tf.learn
35  Estimator.
36
37  Example usage:
38
39  ```python
40  real_feature_column = real_valued_column(...)
41  sparse_feature_column = sparse_column_with_hash_bucket(...)
42  sdca_optimizer = linear.SDCAOptimizer(example_id_column='example_id',
43                                        num_loss_partitions=1,
44                                        num_table_shards=1,
45                                        symmetric_l2_regularization=2.0)
46  classifier = tf.contrib.learn.LinearClassifier(
47      feature_columns=[real_feature_column, sparse_feature_column],
48      weight_column_name=...,
49      optimizer=sdca_optimizer)
50  classifier.fit(input_fn_train, steps=50)
51  classifier.evaluate(input_fn=input_fn_eval)
52  ```
53
54  Here the expectation is that the `input_fn_*` functions passed to train and
55  evaluate return a pair (dict, label_tensor) where dict has `example_id_column`
56  as `key` whose value is a `Tensor` of shape [batch_size] and dtype string.
57  num_loss_partitions defines the number of partitions of the global loss
58  function and should be set to `(#concurrent train ops/per worker)
59  x (#workers)`.
60  Convergence of (global) loss is guaranteed if `num_loss_partitions` is larger
61  or equal to the above product. Larger values for `num_loss_partitions` lead to
62  slower convergence. The recommended value for `num_loss_partitions` in
63  `tf.learn` (where currently there is one process per worker) is the number
64  of workers running the train steps. It defaults to 1 (single machine).
65  `num_table_shards` defines the number of shards for the internal state
66  table, typically set to match the number of parameter servers for large
67  data sets. You can also specify a `partitioner` object to partition the primal
68  weights during training (`div` partitioning strategy will be used).
69  """
70
71  def __init__(self,
72               example_id_column,
73               num_loss_partitions=1,
74               num_table_shards=None,
75               symmetric_l1_regularization=0.0,
76               symmetric_l2_regularization=1.0,
77               adaptive=True,
78               partitioner=None):
79    self._example_id_column = example_id_column
80    self._num_loss_partitions = num_loss_partitions
81    self._num_table_shards = num_table_shards
82    self._symmetric_l1_regularization = symmetric_l1_regularization
83    self._symmetric_l2_regularization = symmetric_l2_regularization
84    self._adaptive = adaptive
85    self._partitioner = partitioner
86
87  def get_name(self):
88    return 'SDCAOptimizer'
89
90  @property
91  def example_id_column(self):
92    return self._example_id_column
93
94  @property
95  def num_loss_partitions(self):
96    return self._num_loss_partitions
97
98  @property
99  def num_table_shards(self):
100    return self._num_table_shards
101
102  @property
103  def symmetric_l1_regularization(self):
104    return self._symmetric_l1_regularization
105
106  @property
107  def symmetric_l2_regularization(self):
108    return self._symmetric_l2_regularization
109
110  @property
111  def adaptive(self):
112    return self._adaptive
113
114  @property
115  def partitioner(self):
116    return self._partitioner
117
118  def get_train_step(self, columns_to_variables, weight_column_name, loss_type,
119                     features, targets, global_step):
120    """Returns the training operation of an SdcaModel optimizer."""
121
122    def _dense_tensor_to_sparse_feature_column(dense_tensor):
123      """Returns SparseFeatureColumn for the input dense_tensor."""
124      ignore_value = 0.0
125      sparse_indices = array_ops.where(
126          math_ops.not_equal(dense_tensor,
127                             math_ops.cast(ignore_value, dense_tensor.dtype)))
128      sparse_values = array_ops.gather_nd(dense_tensor, sparse_indices)
129      # TODO(sibyl-Aix6ihai, sibyl-vie3Poto): Makes this efficient, as now SDCA supports
130      # very sparse features with weights and not weights.
131      return SparseFeatureColumn(
132          array_ops.reshape(
133              array_ops.split(
134                  value=sparse_indices, num_or_size_splits=2, axis=1)[0], [-1]),
135          array_ops.reshape(
136              array_ops.split(
137                  value=sparse_indices, num_or_size_splits=2, axis=1)[1], [-1]),
138          array_ops.reshape(math_ops.cast(sparse_values, dtypes.float32), [-1]))
139
140    def _training_examples_and_variables():
141      """Returns dictionaries for training examples and variables."""
142      batch_size = targets.get_shape()[0]
143
144      # Iterate over all feature columns and create appropriate lists for dense
145      # and sparse features as well as dense and sparse weights (variables) for
146      # SDCA.
147      # TODO(sibyl-vie3Poto): Reshape variables stored as values in column_to_variables
148      # dict as 1-dimensional tensors.
149      dense_features, sparse_features, sparse_feature_with_values = [], [], []
150      dense_feature_weights = []
151      sparse_feature_weights, sparse_feature_with_values_weights = [], []
152      for column in sorted(columns_to_variables.keys(), key=lambda x: x.key):
153        transformed_tensor = features[column]
154        if isinstance(column, layers.feature_column._RealValuedColumn):  # pylint: disable=protected-access
155          # A real-valued column corresponds to a dense feature in SDCA. A
156          # transformed tensor corresponding to a RealValuedColumn should have
157          # rank at most 2. In order to be passed to SDCA, its rank needs to be
158          # exactly 2 (i.e., its shape should be [batch_size, column.dim]).
159          check_rank_op = control_flow_ops.Assert(
160              math_ops.less_equal(array_ops.rank(transformed_tensor), 2),
161              ['transformed_tensor should have rank at most 2.'])
162          # Reshape to [batch_size, dense_column_dimension].
163          with ops.control_dependencies([check_rank_op]):
164            transformed_tensor = array_ops.reshape(transformed_tensor, [
165                array_ops.shape(transformed_tensor)[0], -1
166            ])
167
168          dense_features.append(transformed_tensor)
169          # For real valued columns, the variables list contains exactly one
170          # element.
171          dense_feature_weights.append(columns_to_variables[column][0])
172        elif isinstance(column, layers.feature_column._BucketizedColumn):  # pylint: disable=protected-access
173          # A bucketized column corresponds to a sparse feature in SDCA. The
174          # bucketized feature is "sparsified" for SDCA by converting it to a
175          # SparseFeatureColumn representing the one-hot encoding of the
176          # bucketized feature.
177          #
178          # TODO(sibyl-vie3Poto): Explore whether it is more efficient to translate a
179          # bucketized feature column to a dense feature in SDCA. This will
180          # likely depend on the number of buckets.
181          dense_bucket_tensor = column._to_dnn_input_layer(transformed_tensor)  # pylint: disable=protected-access
182          sparse_feature_column = _dense_tensor_to_sparse_feature_column(
183              dense_bucket_tensor)
184          sparse_feature_with_values.append(sparse_feature_column)
185          # If a partitioner was used during variable creation, we will have a
186          # list of Variables here larger than 1.
187          vars_to_append = columns_to_variables[column][0]
188          if len(columns_to_variables[column]) > 1:
189            vars_to_append = columns_to_variables[column]
190          sparse_feature_with_values_weights.append(vars_to_append)
191        elif isinstance(
192            column,
193            (
194                layers.feature_column._WeightedSparseColumn,  # pylint: disable=protected-access
195                layers.feature_column._CrossedColumn,  # pylint: disable=protected-access
196                layers.feature_column._SparseColumn)):  # pylint: disable=protected-access
197
198          if isinstance(column, layers.feature_column._WeightedSparseColumn):  # pylint: disable=protected-access
199            id_tensor = column.id_tensor(transformed_tensor)
200            weight_tensor = array_ops.reshape(
201                column.weight_tensor(transformed_tensor).values, [-1])
202          else:
203            id_tensor = transformed_tensor
204            weight_tensor = array_ops.ones(
205                [array_ops.shape(id_tensor.indices)[0]], dtypes.float32)
206
207          example_ids = array_ops.reshape(id_tensor.indices[:, 0], [-1])
208
209          flat_ids = array_ops.reshape(id_tensor.values, [-1])
210          # Prune invalid IDs (< 0) from the flat_ids, example_ids, and
211          # weight_tensor.  These can come from looking up an OOV entry in the
212          # vocabulary (default value being -1).
213          is_id_valid = math_ops.greater_equal(flat_ids, 0)
214          flat_ids = array_ops.boolean_mask(flat_ids, is_id_valid)
215          example_ids = array_ops.boolean_mask(example_ids, is_id_valid)
216          weight_tensor = array_ops.boolean_mask(weight_tensor, is_id_valid)
217
218          projection_length = math_ops.reduce_max(flat_ids) + 1
219          # project ids based on example ids so that we can dedup ids that
220          # occur multiple times for a single example.
221          projected_ids = projection_length * example_ids + flat_ids
222
223          # Remove any redundant ids.
224          ids, idx = array_ops.unique(projected_ids)
225          # Keep only one example id per duplicated ids.
226          example_ids_filtered = math_ops.unsorted_segment_min(
227              example_ids, idx,
228              array_ops.shape(ids)[0])
229
230          # reproject ids back feature id space.
231          reproject_ids = (ids - projection_length * example_ids_filtered)
232
233          weights = array_ops.reshape(
234              math_ops.unsorted_segment_sum(weight_tensor, idx,
235                                            array_ops.shape(ids)[0]), [-1])
236          sparse_feature_with_values.append(
237              SparseFeatureColumn(example_ids_filtered, reproject_ids, weights))
238          # If a partitioner was used during variable creation, we will have a
239          # list of Variables here larger than 1.
240          vars_to_append = columns_to_variables[column][0]
241          if len(columns_to_variables[column]) > 1:
242            vars_to_append = columns_to_variables[column]
243          sparse_feature_with_values_weights.append(vars_to_append)
244        else:
245          raise ValueError('SDCAOptimizer does not support column type %s.' %
246                           type(column).__name__)
247
248      example_weights = array_ops.reshape(
249          features[weight_column_name],
250          shape=[-1]) if weight_column_name else array_ops.ones([batch_size])
251      example_ids = features[self._example_id_column]
252      sparse_feature_with_values.extend(sparse_features)
253      sparse_feature_with_values_weights.extend(sparse_feature_weights)
254      examples = dict(
255          sparse_features=sparse_feature_with_values,
256          dense_features=dense_features,
257          example_labels=math_ops.cast(
258              array_ops.reshape(targets, shape=[-1]), dtypes.float32),
259          example_weights=example_weights,
260          example_ids=example_ids)
261      sdca_variables = dict(
262          sparse_features_weights=sparse_feature_with_values_weights,
263          dense_features_weights=dense_feature_weights)
264      return examples, sdca_variables
265
266    training_examples, training_variables = _training_examples_and_variables()
267    sdca_model = sdca_ops.SdcaModel(
268        examples=training_examples,
269        variables=training_variables,
270        options=dict(
271            symmetric_l1_regularization=self._symmetric_l1_regularization,
272            symmetric_l2_regularization=self._symmetric_l2_regularization,
273            adaptive=self._adaptive,
274            num_loss_partitions=self._num_loss_partitions,
275            num_table_shards=self._num_table_shards,
276            loss_type=loss_type))
277    train_op = sdca_model.minimize(global_step=global_step)
278    return sdca_model, train_op
279