1# Copyright 2018 The TensorFlow Authors. All Rights Reserved. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14# ============================================================================== 15"""Experimental API for controlling optimizations in `tf.data` pipelines.""" 16from __future__ import absolute_import 17from __future__ import division 18from __future__ import print_function 19 20import enum 21 22from tensorflow.core.framework import dataset_options_pb2 23from tensorflow.python.data.util import options 24from tensorflow.python.util.tf_export import tf_export 25 26# Do not modify. 27_ENABLE_AUTOTUNE_BUFFERS_BY_DEFAULT = False 28 29 30class _AutotuneAlgorithm(enum.Enum): 31 """Controls what algorithm is used in the autotune implementation.""" 32 HILL_CLIMB = 0 33 GRADIENT_DESCENT = 1 34 35 36@tf_export("data.experimental.MapVectorizationOptions") 37class MapVectorizationOptions(options.OptionsBase): 38 """Represents options for the MapVectorization optimization.""" 39 # TODO(rachelim): Other configuration parameters can go here, for example, 40 # how many "experiments" to run with ChooseFastestBranchDataset. 41 enabled = options.create_option( 42 name="enabled", 43 ty=bool, 44 docstring= 45 "Whether to vectorize map transformations. If None, defaults to False." 46 ) 47 48 use_choose_fastest = options.create_option( 49 name="use_choose_fastest", 50 ty=bool, 51 docstring="Whether to use ChooseFastestBranchDataset with this " 52 "transformation. If True, the pipeline picks between the vectorized and " 53 "original segment at runtime based on their iterations speed. If None, " 54 "defaults to False.") 55 56 def _graph_rewrites(self): 57 graph_rewrites = options.graph_rewrites() 58 result = graph_rewrites(enabled=[], disabled=[], default=[]) 59 if self.enabled is True: # pylint: disable=g-bool-id-comparison 60 result.enabled.append("map_vectorization") 61 elif self.enabled is False: # pylint: disable=g-bool-id-comparison 62 result.disabled.append("map_vectorization") 63 return result 64 65 def _graph_rewrite_configs(self): 66 if not self.enabled: 67 return [] 68 if self.use_choose_fastest: 69 return ["map_vectorization:use_choose_fastest:true"] 70 else: 71 return ["map_vectorization:use_choose_fastest:false"] 72 73 def _to_proto(self): 74 pb = dataset_options_pb2.MapVectorization() 75 if self.enabled is not None: 76 pb.enabled = self.enabled 77 if self.use_choose_fastest is not None: 78 pb.use_choose_fastest = self.use_choose_fastest 79 return pb 80 81 def _from_proto(self, pb): 82 if pb.WhichOneof("optional_enabled") is not None: 83 self.enabled = pb.enabled 84 if pb.WhichOneof("optional_use_choose_fastest") is not None: 85 self.use_choose_fastest = pb.use_choose_fastest 86 87 88@tf_export("data.experimental.OptimizationOptions") 89class OptimizationOptions(options.OptionsBase): 90 """Represents options for dataset optimizations. 91 92 You can set the optimization options of a dataset through the 93 `experimental_optimization` property of `tf.data.Options`; the property is 94 an instance of `tf.data.experimental.OptimizationOptions`. 95 96 ```python 97 options = tf.data.Options() 98 options.experimental_optimization.noop_elimination = True 99 options.experimental_optimization.map_vectorization.enabled = True 100 options.experimental_optimization.apply_default_optimizations = False 101 dataset = dataset.with_options(options) 102 ``` 103 """ 104 apply_default_optimizations = options.create_option( 105 name="apply_default_optimizations", 106 ty=bool, 107 docstring= 108 "Whether to apply default graph optimizations. If False, only graph " 109 "optimizations that have been explicitly enabled will be applied.") 110 111 autotune = options.create_option( 112 name="autotune", 113 ty=bool, 114 docstring= 115 "Whether to automatically tune performance knobs. If None, defaults to " 116 "True.") 117 118 autotune_buffers = options.create_option( 119 name="autotune_buffers", 120 ty=bool, 121 docstring= 122 "When autotuning is enabled (through `autotune`), determines whether to " 123 "also autotune buffer sizes for datasets with parallelism. If None," 124 " defaults to False.") 125 126 autotune_cpu_budget = options.create_option( 127 name="autotune_cpu_budget", 128 ty=int, 129 docstring= 130 "When autotuning is enabled (through `autotune`), determines the CPU " 131 "budget to use. Values greater than the number of schedulable CPU cores " 132 "are allowed but may result in CPU contention. If None, defaults to the " 133 "number of schedulable CPU cores.") 134 135 autotune_ram_budget = options.create_option( 136 name="autotune_ram_budget", 137 ty=int, 138 docstring= 139 "When autotuning is enabled (through `autotune`), determines the RAM " 140 "budget to use. Values greater than the available RAM in bytes may " 141 "result in OOM. If None, defaults to half of the available RAM in bytes.") 142 143 filter_fusion = options.create_option( 144 name="filter_fusion", 145 ty=bool, 146 docstring= 147 "Whether to fuse filter transformations. If None, defaults to False.") 148 149 filter_with_random_uniform_fusion = options.create_option( 150 name="filter_with_random_uniform_fusion", 151 ty=bool, 152 docstring= 153 "Whether to fuse filter dataset that predicts random_uniform < rate into " 154 "a sampling dataset. If None, defaults to False.") 155 156 hoist_random_uniform = options.create_option( 157 name="hoist_random_uniform", 158 ty=bool, 159 docstring= 160 "Whether to hoist `tf.random_uniform()` ops out of map transformations. " 161 "If None, defaults to False.") 162 163 map_and_batch_fusion = options.create_option( 164 name="map_and_batch_fusion", 165 ty=bool, 166 docstring= 167 "Whether to fuse map and batch transformations. If None, defaults to " 168 "True.") 169 170 map_and_filter_fusion = options.create_option( 171 name="map_and_filter_fusion", 172 ty=bool, 173 docstring= 174 "Whether to fuse map and filter transformations. If None, defaults to " 175 "False.") 176 177 map_fusion = options.create_option( 178 name="map_fusion", 179 ty=bool, 180 docstring="Whether to fuse map transformations. If None, defaults to " 181 "False.") 182 183 map_parallelization = options.create_option( 184 name="map_parallelization", 185 ty=bool, 186 docstring= 187 "Whether to parallelize stateless map transformations. If None, defaults " 188 "to False.") 189 190 map_vectorization = options.create_option( 191 name="map_vectorization", 192 ty=MapVectorizationOptions, 193 docstring= 194 "The map vectorization options associated with the dataset. See " 195 "`tf.data.experimental.MapVectorizationOptions` for more details.", 196 default_factory=MapVectorizationOptions) 197 198 noop_elimination = options.create_option( 199 name="noop_elimination", 200 ty=bool, 201 docstring= 202 "Whether to eliminate no-op transformations. If None, defaults to True.") 203 204 parallel_batch = options.create_option( 205 name="parallel_batch", 206 ty=bool, 207 docstring="Whether to parallelize copying of batch elements. This " 208 "optimization is highly experimental and can cause performance " 209 "degradation (e.g. when the parallelization overhead exceeds the " 210 "benefits of performing the data copies in parallel). You should only " 211 "enable this optimization if a) your input pipeline is bottlenecked on " 212 "batching and b) you have validated that this optimization improves " 213 "performance. If None, defaults to False.") 214 215 reorder_data_discarding_ops = options.create_option( 216 name="reorder_data_discarding_ops", 217 ty=bool, 218 docstring="Whether to reorder ops that will discard data to the front of " 219 "unary cardinality preserving transformations, e.g. " 220 "dataset.map(...).take(3) will be optimized to dataset.take(3).map(...). " 221 "For now this optimization will move `skip`, `shard` and `take` to the " 222 "front of `map` and `prefetch`. This optimization is only for " 223 "performance; it will not affect the output of the dataset. " 224 "If None, defaults to True.") 225 226 shuffle_and_repeat_fusion = options.create_option( 227 name="shuffle_and_repeat_fusion", 228 ty=bool, 229 docstring="Whether to fuse shuffle and repeat transformations. If None, " 230 "defaults to True.") 231 232 def _autotune_buffers(self): 233 if self.autotune_buffers is not None: 234 return self.autotune_buffers 235 # The default setting for autotune_buffers is based on 236 # _ENABLE_AUTOTUNE_BUFFERS_BY_DEFAULT 237 return _ENABLE_AUTOTUNE_BUFFERS_BY_DEFAULT 238 239 def _autotune_settings(self): 240 # Default autotune settings 241 autotune = True 242 243 # If autotune_buffers is enabled, we use the GRADIENT_DESCENT algorithm by 244 # default, which is more performant for tuning heterogeneous parameters. 245 algorithm = ( 246 _AutotuneAlgorithm.GRADIENT_DESCENT 247 if self._autotune_buffers() else _AutotuneAlgorithm.HILL_CLIMB) 248 cpu_budget = 0 # Indicates that all CPU cores should be used by default. 249 ram_budget = 0 # Indicates that default value of RAM budget should be used. 250 251 # Set these options if they are explicitly set by the user. 252 if self.autotune is False: # pylint: disable=g-bool-id-comparison 253 autotune = False 254 if self.autotune_cpu_budget is not None: 255 cpu_budget = self.autotune_cpu_budget 256 if self.autotune_ram_budget is not None: 257 ram_budget = self.autotune_ram_budget 258 259 return autotune, algorithm, cpu_budget, ram_budget 260 261 def _graph_rewrites(self): 262 """Produces lists of enabled, disabled and default graph optimizations. 263 264 Returns: 265 result: a namedtuple with three attributes. `result.enabled` is the list 266 of user enabled optimizations. `result.disabled` is the list of user 267 disabled optimizations. `result.default` is the list of optimizations 268 that are enabled by default (the user has not explicitly enabled or 269 disabled them). 270 """ 271 if self.map_vectorization is not None: 272 result = self.map_vectorization._graph_rewrites() # pylint: disable=protected-access 273 else: 274 result = MapVectorizationOptions()._graph_rewrites() # pylint: disable=protected-access 275 276 all_optimizations = [ 277 "filter_fusion", 278 "filter_with_random_uniform_fusion", 279 "hoist_random_uniform", 280 "map_and_batch_fusion", 281 "map_and_filter_fusion", 282 "map_parallelization", 283 "map_fusion", 284 "noop_elimination", 285 "parallel_batch", 286 "reorder_data_discarding_ops", 287 "shuffle_and_repeat_fusion", 288 ] 289 290 if self.apply_default_optimizations is not False: # pylint: disable=g-bool-id-comparison 291 # The following optimizations are turned on by default, unless the user 292 # explicitly disables them. 293 optimizations_to_disable = [ 294 "map_and_batch_fusion", 295 "map_parallelization", 296 "noop_elimination", 297 "shuffle_and_repeat_fusion", 298 ] 299 for optimization in optimizations_to_disable: 300 if getattr(self, optimization) is None: 301 result.default.append(optimization) 302 303 # Each of these attributes on the Options object is either True (explicitly 304 # enabled), False (explicitly disabled), or None (default). 305 for optimization in all_optimizations: 306 if getattr(self, optimization) is True: # pylint: disable=g-bool-id-comparison 307 result.enabled.append(optimization) 308 elif getattr(self, optimization) is False: # pylint: disable=g-bool-id-comparison 309 result.disabled.append(optimization) 310 311 autotune_buffers = self._autotune_buffers() 312 if self.autotune is not False and autotune_buffers is True: # pylint: disable=g-bool-id-comparison 313 # When autotuning buffer sizes is enabled, we inject a `prefetch` 314 # transformation after asynchronous dataset ops. Only the buffer sizes of 315 # prefetch transformations will be autotuned, though this is practically 316 # equivalent to tuning the buffer sizes of the other asynchronous 317 # transformations. 318 result.enabled.append("autotune_buffer_sizes") 319 result.enabled.append("disable_prefetch_legacy_autotune") 320 321 if self.autotune is False: # pylint: disable=g-bool-id-comparison 322 result.disabled.append("autotune_buffer_sizes") 323 result.disabled.append("disable_prefetch_legacy_autotune") 324 325 return result 326 327 def _graph_rewrite_configs(self, autotune): 328 if self.map_vectorization is not None: 329 graph_rewrite_configs = self.map_vectorization._graph_rewrite_configs() # pylint: disable=protected-access 330 else: 331 graph_rewrite_configs = [] 332 autotune_only_optimizations = [ 333 "autotune_buffer_sizes", 334 "batch_parallelization", 335 "disable_prefetch_legacy_autotune", 336 "enable_gradient_descent", 337 "map_parallelization" 338 ] 339 if autotune is False: # pylint: disable=g-bool-id-comparison 340 for optimization in autotune_only_optimizations: 341 graph_rewrite_configs.append(optimization + ":autotune:false") 342 else: 343 for optimization in autotune_only_optimizations: 344 graph_rewrite_configs.append(optimization + ":autotune:true") 345 346 return graph_rewrite_configs 347 348 def _to_proto(self): 349 pb = dataset_options_pb2.OptimizationOptions() 350 if self.apply_default_optimizations is not None: 351 pb.apply_default_optimizations = self.apply_default_optimizations 352 if self.autotune is not None: 353 pb.autotune = self.autotune 354 if self.autotune_buffers is not None: 355 pb.autotune_buffers = self.autotune_buffers 356 if self.autotune_cpu_budget is not None: 357 pb.autotune_cpu_budget = self.autotune_cpu_budget 358 if self.autotune_ram_budget is not None: 359 pb.autotune_ram_budget = self.autotune_ram_budget 360 if self.filter_fusion is not None: 361 pb.filter_fusion = self.filter_fusion 362 if self.filter_with_random_uniform_fusion is not None: 363 pb.filter_with_random_uniform_fusion = ( 364 self.filter_with_random_uniform_fusion) 365 if self.hoist_random_uniform is not None: 366 pb.hoist_random_uniform = self.hoist_random_uniform 367 if self.map_and_batch_fusion is not None: 368 pb.map_and_batch_fusion = self.map_and_batch_fusion 369 if self.map_and_filter_fusion is not None: 370 pb.map_and_filter_fusion = self.map_and_filter_fusion 371 if self.map_fusion is not None: 372 pb.map_fusion = self.map_fusion 373 if self.map_parallelization is not None: 374 pb.map_parallelization = self.map_parallelization 375 pb.map_vectorization.CopyFrom(self.map_vectorization._to_proto()) # pylint: disable=protected-access 376 if self.noop_elimination is not None: 377 pb.noop_elimination = self.noop_elimination 378 if self.parallel_batch is not None: 379 pb.parallel_batch = self.parallel_batch 380 if self.reorder_data_discarding_ops is not None: 381 pb.reorder_data_discarding_ops = self.reorder_data_discarding_ops 382 if self.shuffle_and_repeat_fusion is not None: 383 pb.shuffle_and_repeat_fusion = self.shuffle_and_repeat_fusion 384 return pb 385 386 def _from_proto(self, pb): 387 if pb.WhichOneof("optional_apply_default_optimizations") is not None: 388 self.apply_default_optimizations = pb.apply_default_optimizations 389 if pb.WhichOneof("optional_autotune") is not None: 390 self.autotune = pb.autotune 391 if pb.WhichOneof("optional_autotune_buffers") is not None: 392 self.autotune_buffers = pb.autotune_buffers 393 if pb.WhichOneof("optional_autotune_cpu_budget") is not None: 394 self.autotune_cpu_budget = pb.autotune_cpu_budget 395 if pb.WhichOneof("optional_autotune_ram_budget") is not None: 396 self.autotune_ram_budget = pb.autotune_ram_budget 397 if pb.WhichOneof("optional_filter_fusion") is not None: 398 self.filter_fusion = pb.filter_fusion 399 if pb.WhichOneof("optional_filter_with_random_uniform_fusion") is not None: 400 self.filter_with_random_uniform_fusion = ( 401 pb.filter_with_random_uniform_fusion) 402 if pb.WhichOneof("optional_hoist_random_uniform") is not None: 403 self.hoist_random_uniform = pb.hoist_random_uniform 404 if pb.WhichOneof("optional_map_and_batch_fusion") is not None: 405 self.map_and_batch_fusion = pb.map_and_batch_fusion 406 if pb.WhichOneof("optional_map_and_filter_fusion") is not None: 407 self.map_and_filter_fusion = pb.map_and_filter_fusion 408 if pb.WhichOneof("optional_map_fusion") is not None: 409 self.map_fusion = pb.map_fusion 410 if pb.WhichOneof("optional_map_parallelization") is not None: 411 self.map_parallelization = pb.map_parallelization 412 self.map_vectorization._from_proto(pb.map_vectorization) # pylint: disable=protected-access 413 if pb.WhichOneof("optional_noop_elimination") is not None: 414 self.noop_elimination = pb.noop_elimination 415 if pb.WhichOneof("optional_parallel_batch") is not None: 416 self.parallel_batch = pb.parallel_batch 417 if pb.WhichOneof("optional_reorder_data_discarding_ops") is not None: 418 self.reorder_data_discarding_ops = pb.reorder_data_discarding_ops 419 if pb.WhichOneof("optional_shuffle_and_repeat_fusion") is not None: 420 self.shuffle_and_repeat_fusion = pb.shuffle_and_repeat_fusion 421