1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Experimental API for controlling optimizations in `tf.data` pipelines."""
16from __future__ import absolute_import
17from __future__ import division
18from __future__ import print_function
19
20import enum
21
22from tensorflow.core.framework import dataset_options_pb2
23from tensorflow.python.data.util import options
24from tensorflow.python.util.tf_export import tf_export
25
26# Do not modify.
27_ENABLE_AUTOTUNE_BUFFERS_BY_DEFAULT = False
28
29
30class _AutotuneAlgorithm(enum.Enum):
31  """Controls what algorithm is used in the autotune implementation."""
32  HILL_CLIMB = 0
33  GRADIENT_DESCENT = 1
34
35
36@tf_export("data.experimental.MapVectorizationOptions")
37class MapVectorizationOptions(options.OptionsBase):
38  """Represents options for the MapVectorization optimization."""
39  # TODO(rachelim): Other configuration parameters can go here, for example,
40  # how many "experiments" to run with ChooseFastestBranchDataset.
41  enabled = options.create_option(
42      name="enabled",
43      ty=bool,
44      docstring=
45      "Whether to vectorize map transformations. If None, defaults to False."
46  )
47
48  use_choose_fastest = options.create_option(
49      name="use_choose_fastest",
50      ty=bool,
51      docstring="Whether to use ChooseFastestBranchDataset with this "
52      "transformation. If True, the pipeline picks between the vectorized and "
53      "original segment at runtime based on their iterations speed. If None, "
54      "defaults to False.")
55
56  def _graph_rewrites(self):
57    graph_rewrites = options.graph_rewrites()
58    result = graph_rewrites(enabled=[], disabled=[], default=[])
59    if self.enabled is True:  # pylint: disable=g-bool-id-comparison
60      result.enabled.append("map_vectorization")
61    elif self.enabled is False:  # pylint: disable=g-bool-id-comparison
62      result.disabled.append("map_vectorization")
63    return result
64
65  def _graph_rewrite_configs(self):
66    if not self.enabled:
67      return []
68    if self.use_choose_fastest:
69      return ["map_vectorization:use_choose_fastest:true"]
70    else:
71      return ["map_vectorization:use_choose_fastest:false"]
72
73  def _to_proto(self):
74    pb = dataset_options_pb2.MapVectorization()
75    if self.enabled is not None:
76      pb.enabled = self.enabled
77    if self.use_choose_fastest is not None:
78      pb.use_choose_fastest = self.use_choose_fastest
79    return pb
80
81  def _from_proto(self, pb):
82    if pb.WhichOneof("optional_enabled") is not None:
83      self.enabled = pb.enabled
84    if pb.WhichOneof("optional_use_choose_fastest") is not None:
85      self.use_choose_fastest = pb.use_choose_fastest
86
87
88@tf_export("data.experimental.OptimizationOptions")
89class OptimizationOptions(options.OptionsBase):
90  """Represents options for dataset optimizations.
91
92  You can set the optimization options of a dataset through the
93  `experimental_optimization` property of `tf.data.Options`; the property is
94  an instance of `tf.data.experimental.OptimizationOptions`.
95
96  ```python
97  options = tf.data.Options()
98  options.experimental_optimization.noop_elimination = True
99  options.experimental_optimization.map_vectorization.enabled = True
100  options.experimental_optimization.apply_default_optimizations = False
101  dataset = dataset.with_options(options)
102  ```
103  """
104  apply_default_optimizations = options.create_option(
105      name="apply_default_optimizations",
106      ty=bool,
107      docstring=
108      "Whether to apply default graph optimizations. If False, only graph "
109      "optimizations that have been explicitly enabled will be applied.")
110
111  autotune = options.create_option(
112      name="autotune",
113      ty=bool,
114      docstring=
115      "Whether to automatically tune performance knobs. If None, defaults to "
116      "True.")
117
118  autotune_buffers = options.create_option(
119      name="autotune_buffers",
120      ty=bool,
121      docstring=
122      "When autotuning is enabled (through `autotune`), determines whether to "
123      "also autotune buffer sizes for datasets with parallelism. If None,"
124      " defaults to False.")
125
126  autotune_cpu_budget = options.create_option(
127      name="autotune_cpu_budget",
128      ty=int,
129      docstring=
130      "When autotuning is enabled (through `autotune`), determines the CPU "
131      "budget to use. Values greater than the number of schedulable CPU cores "
132      "are allowed but may result in CPU contention. If None, defaults to the "
133      "number of schedulable CPU cores.")
134
135  autotune_ram_budget = options.create_option(
136      name="autotune_ram_budget",
137      ty=int,
138      docstring=
139      "When autotuning is enabled (through `autotune`), determines the RAM "
140      "budget to use. Values greater than the available RAM in bytes may "
141      "result in OOM. If None, defaults to half of the available RAM in bytes.")
142
143  filter_fusion = options.create_option(
144      name="filter_fusion",
145      ty=bool,
146      docstring=
147      "Whether to fuse filter transformations. If None, defaults to False.")
148
149  filter_with_random_uniform_fusion = options.create_option(
150      name="filter_with_random_uniform_fusion",
151      ty=bool,
152      docstring=
153      "Whether to fuse filter dataset that predicts random_uniform < rate into "
154      "a sampling dataset. If None, defaults to False.")
155
156  hoist_random_uniform = options.create_option(
157      name="hoist_random_uniform",
158      ty=bool,
159      docstring=
160      "Whether to hoist `tf.random_uniform()` ops out of map transformations. "
161      "If None, defaults to False.")
162
163  map_and_batch_fusion = options.create_option(
164      name="map_and_batch_fusion",
165      ty=bool,
166      docstring=
167      "Whether to fuse map and batch transformations. If None, defaults to "
168      "True.")
169
170  map_and_filter_fusion = options.create_option(
171      name="map_and_filter_fusion",
172      ty=bool,
173      docstring=
174      "Whether to fuse map and filter transformations. If None, defaults to "
175      "False.")
176
177  map_fusion = options.create_option(
178      name="map_fusion",
179      ty=bool,
180      docstring="Whether to fuse map transformations. If None, defaults to "
181      "False.")
182
183  map_parallelization = options.create_option(
184      name="map_parallelization",
185      ty=bool,
186      docstring=
187      "Whether to parallelize stateless map transformations. If None, defaults "
188      "to False.")
189
190  map_vectorization = options.create_option(
191      name="map_vectorization",
192      ty=MapVectorizationOptions,
193      docstring=
194      "The map vectorization options associated with the dataset. See "
195      "`tf.data.experimental.MapVectorizationOptions` for more details.",
196      default_factory=MapVectorizationOptions)
197
198  noop_elimination = options.create_option(
199      name="noop_elimination",
200      ty=bool,
201      docstring=
202      "Whether to eliminate no-op transformations. If None, defaults to True.")
203
204  parallel_batch = options.create_option(
205      name="parallel_batch",
206      ty=bool,
207      docstring="Whether to parallelize copying of batch elements. This "
208      "optimization is highly experimental and can cause performance "
209      "degradation (e.g. when the parallelization overhead exceeds the "
210      "benefits of performing the data copies in parallel). You should only "
211      "enable this optimization if a) your input pipeline is bottlenecked on "
212      "batching and b) you have validated that this optimization improves "
213      "performance. If None, defaults to False.")
214
215  reorder_data_discarding_ops = options.create_option(
216      name="reorder_data_discarding_ops",
217      ty=bool,
218      docstring="Whether to reorder ops that will discard data to the front of "
219      "unary cardinality preserving transformations, e.g. "
220      "dataset.map(...).take(3) will be optimized to dataset.take(3).map(...). "
221      "For now this optimization will move `skip`, `shard` and `take` to the "
222      "front of `map` and `prefetch`. This optimization is only for "
223      "performance; it will not affect the output of the dataset. "
224      "If None, defaults to True.")
225
226  shuffle_and_repeat_fusion = options.create_option(
227      name="shuffle_and_repeat_fusion",
228      ty=bool,
229      docstring="Whether to fuse shuffle and repeat transformations. If None, "
230      "defaults to True.")
231
232  def _autotune_buffers(self):
233    if self.autotune_buffers is not None:
234      return self.autotune_buffers
235    # The default setting for autotune_buffers is based on
236    # _ENABLE_AUTOTUNE_BUFFERS_BY_DEFAULT
237    return _ENABLE_AUTOTUNE_BUFFERS_BY_DEFAULT
238
239  def _autotune_settings(self):
240    # Default autotune settings
241    autotune = True
242
243    # If autotune_buffers is enabled, we use the GRADIENT_DESCENT algorithm by
244    # default, which is more performant for tuning heterogeneous parameters.
245    algorithm = (
246        _AutotuneAlgorithm.GRADIENT_DESCENT
247        if self._autotune_buffers() else _AutotuneAlgorithm.HILL_CLIMB)
248    cpu_budget = 0  # Indicates that all CPU cores should be used by default.
249    ram_budget = 0  # Indicates that default value of RAM budget should be used.
250
251    # Set these options if they are explicitly set by the user.
252    if self.autotune is False:  # pylint: disable=g-bool-id-comparison
253      autotune = False
254    if self.autotune_cpu_budget is not None:
255      cpu_budget = self.autotune_cpu_budget
256    if self.autotune_ram_budget is not None:
257      ram_budget = self.autotune_ram_budget
258
259    return autotune, algorithm, cpu_budget, ram_budget
260
261  def _graph_rewrites(self):
262    """Produces lists of enabled, disabled and default graph optimizations.
263
264    Returns:
265      result: a namedtuple with three attributes. `result.enabled` is the list
266        of user enabled optimizations. `result.disabled` is the list of user
267        disabled optimizations. `result.default` is the list of optimizations
268        that are enabled by default (the user has not explicitly enabled or
269        disabled them).
270    """
271    if self.map_vectorization is not None:
272      result = self.map_vectorization._graph_rewrites()  # pylint: disable=protected-access
273    else:
274      result = MapVectorizationOptions()._graph_rewrites()  # pylint: disable=protected-access
275
276    all_optimizations = [
277        "filter_fusion",
278        "filter_with_random_uniform_fusion",
279        "hoist_random_uniform",
280        "map_and_batch_fusion",
281        "map_and_filter_fusion",
282        "map_parallelization",
283        "map_fusion",
284        "noop_elimination",
285        "parallel_batch",
286        "reorder_data_discarding_ops",
287        "shuffle_and_repeat_fusion",
288    ]
289
290    if self.apply_default_optimizations is not False:  # pylint: disable=g-bool-id-comparison
291      # The following optimizations are turned on by default, unless the user
292      # explicitly disables them.
293      optimizations_to_disable = [
294          "map_and_batch_fusion",
295          "map_parallelization",
296          "noop_elimination",
297          "shuffle_and_repeat_fusion",
298      ]
299      for optimization in optimizations_to_disable:
300        if getattr(self, optimization) is None:
301          result.default.append(optimization)
302
303    # Each of these attributes on the Options object is either True (explicitly
304    # enabled), False (explicitly disabled), or None (default).
305    for optimization in all_optimizations:
306      if getattr(self, optimization) is True:  # pylint: disable=g-bool-id-comparison
307        result.enabled.append(optimization)
308      elif getattr(self, optimization) is False:  # pylint: disable=g-bool-id-comparison
309        result.disabled.append(optimization)
310
311    autotune_buffers = self._autotune_buffers()
312    if self.autotune is not False and autotune_buffers is True:  # pylint: disable=g-bool-id-comparison
313      # When autotuning buffer sizes is enabled, we inject a `prefetch`
314      # transformation after asynchronous dataset ops. Only the buffer sizes of
315      # prefetch transformations will be autotuned, though this is practically
316      # equivalent to tuning the buffer sizes of the other asynchronous
317      # transformations.
318      result.enabled.append("autotune_buffer_sizes")
319      result.enabled.append("disable_prefetch_legacy_autotune")
320
321    if self.autotune is False:  # pylint: disable=g-bool-id-comparison
322      result.disabled.append("autotune_buffer_sizes")
323      result.disabled.append("disable_prefetch_legacy_autotune")
324
325    return result
326
327  def _graph_rewrite_configs(self, autotune):
328    if self.map_vectorization is not None:
329      graph_rewrite_configs = self.map_vectorization._graph_rewrite_configs()  # pylint: disable=protected-access
330    else:
331      graph_rewrite_configs = []
332    autotune_only_optimizations = [
333        "autotune_buffer_sizes",
334        "batch_parallelization",
335        "disable_prefetch_legacy_autotune",
336        "enable_gradient_descent",
337        "map_parallelization"
338    ]
339    if autotune is False:  # pylint: disable=g-bool-id-comparison
340      for optimization in autotune_only_optimizations:
341        graph_rewrite_configs.append(optimization + ":autotune:false")
342    else:
343      for optimization in autotune_only_optimizations:
344        graph_rewrite_configs.append(optimization + ":autotune:true")
345
346    return graph_rewrite_configs
347
348  def _to_proto(self):
349    pb = dataset_options_pb2.OptimizationOptions()
350    if self.apply_default_optimizations is not None:
351      pb.apply_default_optimizations = self.apply_default_optimizations
352    if self.autotune is not None:
353      pb.autotune = self.autotune
354    if self.autotune_buffers is not None:
355      pb.autotune_buffers = self.autotune_buffers
356    if self.autotune_cpu_budget is not None:
357      pb.autotune_cpu_budget = self.autotune_cpu_budget
358    if self.autotune_ram_budget is not None:
359      pb.autotune_ram_budget = self.autotune_ram_budget
360    if self.filter_fusion is not None:
361      pb.filter_fusion = self.filter_fusion
362    if self.filter_with_random_uniform_fusion is not None:
363      pb.filter_with_random_uniform_fusion = (
364          self.filter_with_random_uniform_fusion)
365    if self.hoist_random_uniform is not None:
366      pb.hoist_random_uniform = self.hoist_random_uniform
367    if self.map_and_batch_fusion is not None:
368      pb.map_and_batch_fusion = self.map_and_batch_fusion
369    if self.map_and_filter_fusion is not None:
370      pb.map_and_filter_fusion = self.map_and_filter_fusion
371    if self.map_fusion is not None:
372      pb.map_fusion = self.map_fusion
373    if self.map_parallelization is not None:
374      pb.map_parallelization = self.map_parallelization
375    pb.map_vectorization.CopyFrom(self.map_vectorization._to_proto())  # pylint: disable=protected-access
376    if self.noop_elimination is not None:
377      pb.noop_elimination = self.noop_elimination
378    if self.parallel_batch is not None:
379      pb.parallel_batch = self.parallel_batch
380    if self.reorder_data_discarding_ops is not None:
381      pb.reorder_data_discarding_ops = self.reorder_data_discarding_ops
382    if self.shuffle_and_repeat_fusion is not None:
383      pb.shuffle_and_repeat_fusion = self.shuffle_and_repeat_fusion
384    return pb
385
386  def _from_proto(self, pb):
387    if pb.WhichOneof("optional_apply_default_optimizations") is not None:
388      self.apply_default_optimizations = pb.apply_default_optimizations
389    if pb.WhichOneof("optional_autotune") is not None:
390      self.autotune = pb.autotune
391    if pb.WhichOneof("optional_autotune_buffers") is not None:
392      self.autotune_buffers = pb.autotune_buffers
393    if pb.WhichOneof("optional_autotune_cpu_budget") is not None:
394      self.autotune_cpu_budget = pb.autotune_cpu_budget
395    if pb.WhichOneof("optional_autotune_ram_budget") is not None:
396      self.autotune_ram_budget = pb.autotune_ram_budget
397    if pb.WhichOneof("optional_filter_fusion") is not None:
398      self.filter_fusion = pb.filter_fusion
399    if pb.WhichOneof("optional_filter_with_random_uniform_fusion") is not None:
400      self.filter_with_random_uniform_fusion = (
401          pb.filter_with_random_uniform_fusion)
402    if pb.WhichOneof("optional_hoist_random_uniform") is not None:
403      self.hoist_random_uniform = pb.hoist_random_uniform
404    if pb.WhichOneof("optional_map_and_batch_fusion") is not None:
405      self.map_and_batch_fusion = pb.map_and_batch_fusion
406    if pb.WhichOneof("optional_map_and_filter_fusion") is not None:
407      self.map_and_filter_fusion = pb.map_and_filter_fusion
408    if pb.WhichOneof("optional_map_fusion") is not None:
409      self.map_fusion = pb.map_fusion
410    if pb.WhichOneof("optional_map_parallelization") is not None:
411      self.map_parallelization = pb.map_parallelization
412    self.map_vectorization._from_proto(pb.map_vectorization)  # pylint: disable=protected-access
413    if pb.WhichOneof("optional_noop_elimination") is not None:
414      self.noop_elimination = pb.noop_elimination
415    if pb.WhichOneof("optional_parallel_batch") is not None:
416      self.parallel_batch = pb.parallel_batch
417    if pb.WhichOneof("optional_reorder_data_discarding_ops") is not None:
418      self.reorder_data_discarding_ops = pb.reorder_data_discarding_ops
419    if pb.WhichOneof("optional_shuffle_and_repeat_fusion") is not None:
420      self.shuffle_and_repeat_fusion = pb.shuffle_and_repeat_fusion
421