1"""Linear Estimators.""" 2# Copyright 2015 The TensorFlow Authors. All Rights Reserved. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15from __future__ import absolute_import 16from __future__ import division 17from __future__ import print_function 18 19from tensorflow.contrib import layers 20from tensorflow.contrib.linear_optimizer.python.ops import sdca_ops 21from tensorflow.contrib.linear_optimizer.python.ops.sparse_feature_column import SparseFeatureColumn 22from tensorflow.python.framework import dtypes 23from tensorflow.python.framework import ops 24from tensorflow.python.ops import array_ops 25from tensorflow.python.ops import control_flow_ops 26from tensorflow.python.ops import math_ops 27 28 29# TODO(sibyl-vie3Poto, sibyl-Aix6ihai): Add proper testing to this wrapper once the API is 30# stable. 31class SDCAOptimizer(object): 32 """Wrapper class for SDCA optimizer. 33 34 The wrapper is currently meant for use as an optimizer within a tf.learn 35 Estimator. 36 37 Example usage: 38 39 ```python 40 real_feature_column = real_valued_column(...) 41 sparse_feature_column = sparse_column_with_hash_bucket(...) 42 sdca_optimizer = linear.SDCAOptimizer(example_id_column='example_id', 43 num_loss_partitions=1, 44 num_table_shards=1, 45 symmetric_l2_regularization=2.0) 46 classifier = tf.contrib.learn.LinearClassifier( 47 feature_columns=[real_feature_column, sparse_feature_column], 48 weight_column_name=..., 49 optimizer=sdca_optimizer) 50 classifier.fit(input_fn_train, steps=50) 51 classifier.evaluate(input_fn=input_fn_eval) 52 ``` 53 54 Here the expectation is that the `input_fn_*` functions passed to train and 55 evaluate return a pair (dict, label_tensor) where dict has `example_id_column` 56 as `key` whose value is a `Tensor` of shape [batch_size] and dtype string. 57 num_loss_partitions defines the number of partitions of the global loss 58 function and should be set to `(#concurrent train ops/per worker) 59 x (#workers)`. 60 Convergence of (global) loss is guaranteed if `num_loss_partitions` is larger 61 or equal to the above product. Larger values for `num_loss_partitions` lead to 62 slower convergence. The recommended value for `num_loss_partitions` in 63 `tf.learn` (where currently there is one process per worker) is the number 64 of workers running the train steps. It defaults to 1 (single machine). 65 `num_table_shards` defines the number of shards for the internal state 66 table, typically set to match the number of parameter servers for large 67 data sets. You can also specify a `partitioner` object to partition the primal 68 weights during training (`div` partitioning strategy will be used). 69 """ 70 71 def __init__(self, 72 example_id_column, 73 num_loss_partitions=1, 74 num_table_shards=None, 75 symmetric_l1_regularization=0.0, 76 symmetric_l2_regularization=1.0, 77 adaptive=True, 78 partitioner=None): 79 self._example_id_column = example_id_column 80 self._num_loss_partitions = num_loss_partitions 81 self._num_table_shards = num_table_shards 82 self._symmetric_l1_regularization = symmetric_l1_regularization 83 self._symmetric_l2_regularization = symmetric_l2_regularization 84 self._adaptive = adaptive 85 self._partitioner = partitioner 86 87 def get_name(self): 88 return 'SDCAOptimizer' 89 90 @property 91 def example_id_column(self): 92 return self._example_id_column 93 94 @property 95 def num_loss_partitions(self): 96 return self._num_loss_partitions 97 98 @property 99 def num_table_shards(self): 100 return self._num_table_shards 101 102 @property 103 def symmetric_l1_regularization(self): 104 return self._symmetric_l1_regularization 105 106 @property 107 def symmetric_l2_regularization(self): 108 return self._symmetric_l2_regularization 109 110 @property 111 def adaptive(self): 112 return self._adaptive 113 114 @property 115 def partitioner(self): 116 return self._partitioner 117 118 def get_train_step(self, columns_to_variables, weight_column_name, loss_type, 119 features, targets, global_step): 120 """Returns the training operation of an SdcaModel optimizer.""" 121 122 def _dense_tensor_to_sparse_feature_column(dense_tensor): 123 """Returns SparseFeatureColumn for the input dense_tensor.""" 124 ignore_value = 0.0 125 sparse_indices = array_ops.where( 126 math_ops.not_equal(dense_tensor, 127 math_ops.cast(ignore_value, dense_tensor.dtype))) 128 sparse_values = array_ops.gather_nd(dense_tensor, sparse_indices) 129 # TODO(sibyl-Aix6ihai, sibyl-vie3Poto): Makes this efficient, as now SDCA supports 130 # very sparse features with weights and not weights. 131 return SparseFeatureColumn( 132 array_ops.reshape( 133 array_ops.split( 134 value=sparse_indices, num_or_size_splits=2, axis=1)[0], [-1]), 135 array_ops.reshape( 136 array_ops.split( 137 value=sparse_indices, num_or_size_splits=2, axis=1)[1], [-1]), 138 array_ops.reshape(math_ops.cast(sparse_values, dtypes.float32), [-1])) 139 140 def _training_examples_and_variables(): 141 """Returns dictionaries for training examples and variables.""" 142 batch_size = targets.get_shape()[0] 143 144 # Iterate over all feature columns and create appropriate lists for dense 145 # and sparse features as well as dense and sparse weights (variables) for 146 # SDCA. 147 # TODO(sibyl-vie3Poto): Reshape variables stored as values in column_to_variables 148 # dict as 1-dimensional tensors. 149 dense_features, sparse_features, sparse_feature_with_values = [], [], [] 150 dense_feature_weights = [] 151 sparse_feature_weights, sparse_feature_with_values_weights = [], [] 152 for column in sorted(columns_to_variables.keys(), key=lambda x: x.key): 153 transformed_tensor = features[column] 154 if isinstance(column, layers.feature_column._RealValuedColumn): # pylint: disable=protected-access 155 # A real-valued column corresponds to a dense feature in SDCA. A 156 # transformed tensor corresponding to a RealValuedColumn should have 157 # rank at most 2. In order to be passed to SDCA, its rank needs to be 158 # exactly 2 (i.e., its shape should be [batch_size, column.dim]). 159 check_rank_op = control_flow_ops.Assert( 160 math_ops.less_equal(array_ops.rank(transformed_tensor), 2), 161 ['transformed_tensor should have rank at most 2.']) 162 # Reshape to [batch_size, dense_column_dimension]. 163 with ops.control_dependencies([check_rank_op]): 164 transformed_tensor = array_ops.reshape(transformed_tensor, [ 165 array_ops.shape(transformed_tensor)[0], -1 166 ]) 167 168 dense_features.append(transformed_tensor) 169 # For real valued columns, the variables list contains exactly one 170 # element. 171 dense_feature_weights.append(columns_to_variables[column][0]) 172 elif isinstance(column, layers.feature_column._BucketizedColumn): # pylint: disable=protected-access 173 # A bucketized column corresponds to a sparse feature in SDCA. The 174 # bucketized feature is "sparsified" for SDCA by converting it to a 175 # SparseFeatureColumn representing the one-hot encoding of the 176 # bucketized feature. 177 # 178 # TODO(sibyl-vie3Poto): Explore whether it is more efficient to translate a 179 # bucketized feature column to a dense feature in SDCA. This will 180 # likely depend on the number of buckets. 181 dense_bucket_tensor = column._to_dnn_input_layer(transformed_tensor) # pylint: disable=protected-access 182 sparse_feature_column = _dense_tensor_to_sparse_feature_column( 183 dense_bucket_tensor) 184 sparse_feature_with_values.append(sparse_feature_column) 185 # If a partitioner was used during variable creation, we will have a 186 # list of Variables here larger than 1. 187 vars_to_append = columns_to_variables[column][0] 188 if len(columns_to_variables[column]) > 1: 189 vars_to_append = columns_to_variables[column] 190 sparse_feature_with_values_weights.append(vars_to_append) 191 elif isinstance( 192 column, 193 ( 194 layers.feature_column._WeightedSparseColumn, # pylint: disable=protected-access 195 layers.feature_column._CrossedColumn, # pylint: disable=protected-access 196 layers.feature_column._SparseColumn)): # pylint: disable=protected-access 197 198 if isinstance(column, layers.feature_column._WeightedSparseColumn): # pylint: disable=protected-access 199 id_tensor = column.id_tensor(transformed_tensor) 200 weight_tensor = array_ops.reshape( 201 column.weight_tensor(transformed_tensor).values, [-1]) 202 else: 203 id_tensor = transformed_tensor 204 weight_tensor = array_ops.ones( 205 [array_ops.shape(id_tensor.indices)[0]], dtypes.float32) 206 207 example_ids = array_ops.reshape(id_tensor.indices[:, 0], [-1]) 208 209 flat_ids = array_ops.reshape(id_tensor.values, [-1]) 210 # Prune invalid IDs (< 0) from the flat_ids, example_ids, and 211 # weight_tensor. These can come from looking up an OOV entry in the 212 # vocabulary (default value being -1). 213 is_id_valid = math_ops.greater_equal(flat_ids, 0) 214 flat_ids = array_ops.boolean_mask(flat_ids, is_id_valid) 215 example_ids = array_ops.boolean_mask(example_ids, is_id_valid) 216 weight_tensor = array_ops.boolean_mask(weight_tensor, is_id_valid) 217 218 projection_length = math_ops.reduce_max(flat_ids) + 1 219 # project ids based on example ids so that we can dedup ids that 220 # occur multiple times for a single example. 221 projected_ids = projection_length * example_ids + flat_ids 222 223 # Remove any redundant ids. 224 ids, idx = array_ops.unique(projected_ids) 225 # Keep only one example id per duplicated ids. 226 example_ids_filtered = math_ops.unsorted_segment_min( 227 example_ids, idx, 228 array_ops.shape(ids)[0]) 229 230 # reproject ids back feature id space. 231 reproject_ids = (ids - projection_length * example_ids_filtered) 232 233 weights = array_ops.reshape( 234 math_ops.unsorted_segment_sum(weight_tensor, idx, 235 array_ops.shape(ids)[0]), [-1]) 236 sparse_feature_with_values.append( 237 SparseFeatureColumn(example_ids_filtered, reproject_ids, weights)) 238 # If a partitioner was used during variable creation, we will have a 239 # list of Variables here larger than 1. 240 vars_to_append = columns_to_variables[column][0] 241 if len(columns_to_variables[column]) > 1: 242 vars_to_append = columns_to_variables[column] 243 sparse_feature_with_values_weights.append(vars_to_append) 244 else: 245 raise ValueError('SDCAOptimizer does not support column type %s.' % 246 type(column).__name__) 247 248 example_weights = array_ops.reshape( 249 features[weight_column_name], 250 shape=[-1]) if weight_column_name else array_ops.ones([batch_size]) 251 example_ids = features[self._example_id_column] 252 sparse_feature_with_values.extend(sparse_features) 253 sparse_feature_with_values_weights.extend(sparse_feature_weights) 254 examples = dict( 255 sparse_features=sparse_feature_with_values, 256 dense_features=dense_features, 257 example_labels=math_ops.cast( 258 array_ops.reshape(targets, shape=[-1]), dtypes.float32), 259 example_weights=example_weights, 260 example_ids=example_ids) 261 sdca_variables = dict( 262 sparse_features_weights=sparse_feature_with_values_weights, 263 dense_features_weights=dense_feature_weights) 264 return examples, sdca_variables 265 266 training_examples, training_variables = _training_examples_and_variables() 267 sdca_model = sdca_ops.SdcaModel( 268 examples=training_examples, 269 variables=training_variables, 270 options=dict( 271 symmetric_l1_regularization=self._symmetric_l1_regularization, 272 symmetric_l2_regularization=self._symmetric_l2_regularization, 273 adaptive=self._adaptive, 274 num_loss_partitions=self._num_loss_partitions, 275 num_table_shards=self._num_table_shards, 276 loss_type=loss_type)) 277 train_op = sdca_model.minimize(global_step=global_step) 278 return sdca_model, train_op 279