1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15"""Input-pipeline utilities for Distribution strategies."""
16
17from __future__ import absolute_import
18from __future__ import division
19from __future__ import print_function
20
21from tensorflow.python.data.ops import dataset_ops
22from tensorflow.python.data.util import traverse
23from tensorflow.python.framework import op_def_registry
24from tensorflow.python.framework import ops
25from tensorflow.python.platform import tf_logging
26
27
28# TODO(priyag): Any other reader datasets to consider here?
29_READER_DATASET_OPS = [
30    "TextLineDataset", "TFRecordDataset", "FixedLengthRecordDataset",
31    "FixedLengthRecordDatasetV2"
32]
33
34
35# pylint: disable=protected-access
36def auto_shard_dataset(dataset, num_shards, index):
37  """Shard the input pipeline by sharding the underlying list of files.
38
39  Args:
40    dataset: A `tf.data.Dataset` instance, typically the result of a bunch of
41      dataset transformations.
42    num_shards: A `tf.int64` scalar `tf.Tensor`, representing the number of
43        shards operating in parallel. Same usage as in `tf.data.Dataset.shard`.
44    index: A `tf.int64` scalar `tf.Tensor`, representing the worker index.
45      Same usage as in `tf.data.Dataset.shard`.
46
47  Returns:
48    A modified `Dataset` obtained by updating the pipeline sharded by the
49    files. The input dataset will be returned if we cannot automatically
50    determine a good way to shard the input dataset.
51  """
52
53  # TODO(rohanj): b/120673685 to track re-enabling auto sharding.
54  tf_logging.warn("Autosharding is currently disabled. Please shard your input "
55                  "manually.")
56  del num_shards, index
57  return dataset
58
59
60def _clone_dataset(dataset):
61  """Returns a cloned version of `dataset`."""
62  variant_tensor_ops = traverse.obtain_all_variant_tensor_ops(dataset)
63  remap_dict = _clone_helper(dataset._variant_tensor.op, variant_tensor_ops)
64  new_variant_tensor = remap_dict[dataset._variant_tensor.op].outputs[0]
65  return dataset_ops._VariantDataset(new_variant_tensor,
66                                     dataset._element_structure)
67
68
69def _get_op_def(op):
70  return op.op_def or op_def_registry.get_registered_ops()[op.type]
71
72
73def _clone_helper(op_to_clone, variant_tensor_ops):
74  """Helper method that recursively clones `op_to_clone`.
75
76  Args:
77    op_to_clone: The op we want to clone.
78    variant_tensor_ops: A list of ops that we have to clone along the way.
79
80  Returns:
81    A dictionary mapping old_ops to new_ops created. Includes op_to_clone
82    as a key.
83  """
84  remap_dict = {}
85  for input_tensor in op_to_clone.inputs:
86    input_tensor_op = input_tensor.op
87    if input_tensor_op in variant_tensor_ops:
88      recursive_map = _clone_helper(input_tensor_op, variant_tensor_ops)
89      remap_dict.update(recursive_map)
90  inputs_list = []
91  for input_tensor in op_to_clone.inputs:
92    input_tensor_op = input_tensor.op
93    if input_tensor_op in remap_dict:
94      remapped_input = remap_dict[input_tensor_op].outputs[0]
95      inputs_list.append(remapped_input)
96    else:
97      inputs_list.append(input_tensor_op.outputs[input_tensor.value_index])
98  g = ops.get_default_graph()
99  new_op = g.create_op(
100      op_to_clone.type,
101      inputs_list, [o.dtype for o in op_to_clone.outputs],
102      name=op_to_clone.name,
103      attrs=op_to_clone.node_def.attr,
104      op_def=_get_op_def(op_to_clone))
105  remap_dict[op_to_clone] = new_op
106  return remap_dict
107