1# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14# ==============================================================================
15
16"""Input pipeline.
17
18Please see the [reading data
19how-to](https://tensorflow.org/api_guides/python/reading_data)
20for context.
21"""
22
23from __future__ import absolute_import
24from __future__ import division
25from __future__ import print_function
26
27from six.moves import xrange  # pylint: disable=redefined-builtin
28
29from tensorflow.python.eager import context
30from tensorflow.python.framework import constant_op
31from tensorflow.python.framework import dtypes
32from tensorflow.python.framework import ops
33from tensorflow.python.framework import sparse_tensor
34from tensorflow.python.framework import tensor_shape
35from tensorflow.python.layers import utils
36from tensorflow.python.ops import array_ops
37from tensorflow.python.ops import control_flow_ops
38from tensorflow.python.ops import data_flow_ops
39from tensorflow.python.ops import io_ops
40from tensorflow.python.ops import math_ops
41from tensorflow.python.ops import random_ops
42from tensorflow.python.ops import sparse_ops
43from tensorflow.python.ops import variable_scope as vs
44from tensorflow.python.summary import summary
45from tensorflow.python.training import queue_runner
46from tensorflow.python.util import deprecation
47from tensorflow.python.util.compat import collections_abc
48from tensorflow.python.util.tf_export import tf_export
49
50
51# pylint: disable=protected-access
52_store_sparse = sparse_ops._add_sparse_to_tensors_map
53_store_many_sparse = sparse_ops._add_many_sparse_to_tensors_map
54_restore_sparse = sparse_ops._take_many_sparse_from_tensors_map
55# pylint: enable=protected-access
56
57
58@tf_export(
59    "io.match_filenames_once",
60    v1=["io.match_filenames_once", "train.match_filenames_once"])
61@deprecation.deprecated_endpoints("train.match_filenames_once")
62def match_filenames_once(pattern, name=None):
63  """Save the list of files matching pattern, so it is only computed once.
64
65  NOTE: The order of the files returned is deterministic.
66
67  Args:
68    pattern: A file pattern (glob), or 1D tensor of file patterns.
69    name: A name for the operations (optional).
70
71  Returns:
72    A variable that is initialized to the list of files matching the pattern(s).
73  """
74  with ops.name_scope(name, "matching_filenames", [pattern]) as name:
75    return vs.variable(
76        name=name, initial_value=io_ops.matching_files(pattern),
77        trainable=False, validate_shape=False,
78        collections=[ops.GraphKeys.LOCAL_VARIABLES])
79
80
81@tf_export(v1=["train.limit_epochs"])
82@deprecation.deprecated(
83    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
84    "`tf.data.Dataset.from_tensors(tensor).repeat(num_epochs)`.")
85def limit_epochs(tensor, num_epochs=None, name=None):
86  """Returns tensor `num_epochs` times and then raises an `OutOfRange` error.
87
88  Note: creates local counter `epochs`. Use `local_variables_initializer()` to
89  initialize local variables.
90
91  Args:
92    tensor: Any `Tensor`.
93    num_epochs: A positive integer (optional).  If specified, limits the number
94      of steps the output tensor may be evaluated.
95    name: A name for the operations (optional).
96
97  Returns:
98    tensor or `OutOfRange`.
99
100  Raises:
101    ValueError: if `num_epochs` is invalid.
102  """
103  if num_epochs is None:
104    return tensor
105  if num_epochs <= 0:
106    raise ValueError("num_epochs must be > 0 not %d." % num_epochs)
107  with ops.name_scope(name, "limit_epochs", [tensor]) as name:
108    zero64 = constant_op.constant(0, dtype=dtypes.int64)
109    epochs = vs.variable(
110        zero64, name="epochs", trainable=False,
111        collections=[ops.GraphKeys.LOCAL_VARIABLES])
112    counter = epochs.count_up_to(num_epochs)
113    with ops.control_dependencies([counter]):
114      return array_ops.identity(tensor, name=name)
115
116
117@tf_export(v1=["train.input_producer"])
118@deprecation.deprecated(
119    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
120    "`tf.data.Dataset.from_tensor_slices(input_tensor).shuffle"
121    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
122    "`shuffle=False`, omit the `.shuffle(...)`.")
123def input_producer(input_tensor,
124                   element_shape=None,
125                   num_epochs=None,
126                   shuffle=True,
127                   seed=None,
128                   capacity=32,
129                   shared_name=None,
130                   summary_name=None,
131                   name=None,
132                   cancel_op=None):
133  """Output the rows of `input_tensor` to a queue for an input pipeline.
134
135  Note: if `num_epochs` is not `None`, this function creates local counter
136  `epochs`. Use `local_variables_initializer()` to initialize local variables.
137
138  Args:
139    input_tensor: A tensor with the rows to produce. Must be at least
140      one-dimensional. Must either have a fully-defined shape, or
141      `element_shape` must be defined.
142    element_shape: (Optional.) A `TensorShape` representing the shape of a
143      row of `input_tensor`, if it cannot be inferred.
144    num_epochs: (Optional.) An integer. If specified `input_producer` produces
145      each row of `input_tensor` `num_epochs` times before generating an
146      `OutOfRange` error. If not specified, `input_producer` can cycle through
147      the rows of `input_tensor` an unlimited number of times.
148    shuffle: (Optional.) A boolean. If true, the rows are randomly shuffled
149      within each epoch.
150    seed: (Optional.) An integer. The seed to use if `shuffle` is true.
151    capacity: (Optional.) The capacity of the queue to be used for buffering
152      the input.
153    shared_name: (Optional.) If set, this queue will be shared under the given
154      name across multiple sessions.
155    summary_name: (Optional.) If set, a scalar summary for the current queue
156      size will be generated, using this name as part of the tag.
157    name: (Optional.) A name for queue.
158    cancel_op: (Optional.) Cancel op for the queue
159
160  Returns:
161    A queue with the output rows.  A `QueueRunner` for the queue is
162    added to the current `QUEUE_RUNNER` collection of the current
163    graph.
164
165  Raises:
166    ValueError: If the shape of the input cannot be inferred from the arguments.
167    RuntimeError: If called with eager execution enabled.
168
169  @compatibility(eager)
170  Input pipelines based on Queues are not supported when eager execution is
171  enabled. Please use the `tf.data` API to ingest data under eager execution.
172  @end_compatibility
173  """
174  if context.executing_eagerly():
175    raise RuntimeError(
176        "Input pipelines based on Queues are not supported when eager execution"
177        " is enabled. Please use tf.data to ingest data into your model"
178        " instead.")
179  with ops.name_scope(name, "input_producer", [input_tensor]):
180    input_tensor = ops.convert_to_tensor(input_tensor, name="input_tensor")
181    element_shape = input_tensor.shape[1:].merge_with(element_shape)
182    if not element_shape.is_fully_defined():
183      raise ValueError("Either `input_tensor` must have a fully defined shape "
184                       "or `element_shape` must be specified")
185
186    if shuffle:
187      input_tensor = random_ops.random_shuffle(input_tensor, seed=seed)
188
189    input_tensor = limit_epochs(input_tensor, num_epochs)
190
191    q = data_flow_ops.FIFOQueue(capacity=capacity,
192                                dtypes=[input_tensor.dtype.base_dtype],
193                                shapes=[element_shape],
194                                shared_name=shared_name, name=name)
195    enq = q.enqueue_many([input_tensor])
196    queue_runner.add_queue_runner(
197        queue_runner.QueueRunner(
198            q, [enq], cancel_op=cancel_op))
199    if summary_name is not None:
200      summary.scalar(summary_name,
201                     math_ops.cast(q.size(), dtypes.float32) * (1. / capacity))
202    return q
203
204
205@tf_export(v1=["train.string_input_producer"])
206@deprecation.deprecated(
207    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
208    "`tf.data.Dataset.from_tensor_slices(string_tensor).shuffle"
209    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
210    "`shuffle=False`, omit the `.shuffle(...)`.")
211def string_input_producer(string_tensor,
212                          num_epochs=None,
213                          shuffle=True,
214                          seed=None,
215                          capacity=32,
216                          shared_name=None,
217                          name=None,
218                          cancel_op=None):
219  """Output strings (e.g. filenames) to a queue for an input pipeline.
220
221  Note: if `num_epochs` is not `None`, this function creates local counter
222  `epochs`. Use `local_variables_initializer()` to initialize local variables.
223
224  Args:
225    string_tensor: A 1-D string tensor with the strings to produce.
226    num_epochs: An integer (optional). If specified, `string_input_producer`
227      produces each string from `string_tensor` `num_epochs` times before
228      generating an `OutOfRange` error. If not specified,
229      `string_input_producer` can cycle through the strings in `string_tensor`
230      an unlimited number of times.
231    shuffle: Boolean. If true, the strings are randomly shuffled within each
232      epoch.
233    seed: An integer (optional). Seed used if shuffle == True.
234    capacity: An integer. Sets the queue capacity.
235    shared_name: (optional). If set, this queue will be shared under the given
236      name across multiple sessions. All sessions open to the device which has
237      this queue will be able to access it via the shared_name. Using this in
238      a distributed setting means each name will only be seen by one of the
239      sessions which has access to this operation.
240    name: A name for the operations (optional).
241    cancel_op: Cancel op for the queue (optional).
242
243  Returns:
244    A queue with the output strings.  A `QueueRunner` for the Queue
245    is added to the current `Graph`'s `QUEUE_RUNNER` collection.
246
247  Raises:
248    ValueError: If the string_tensor is a null Python list.  At runtime,
249    will fail with an assertion if string_tensor becomes a null tensor.
250
251  @compatibility(eager)
252  Input pipelines based on Queues are not supported when eager execution is
253  enabled. Please use the `tf.data` API to ingest data under eager execution.
254  @end_compatibility
255  """
256  not_null_err = "string_input_producer requires a non-null input tensor"
257  if not isinstance(string_tensor, ops.Tensor) and not string_tensor:
258    raise ValueError(not_null_err)
259
260  with ops.name_scope(name, "input_producer", [string_tensor]) as name:
261    string_tensor = ops.convert_to_tensor(string_tensor, dtype=dtypes.string)
262    with ops.control_dependencies([
263        control_flow_ops.Assert(
264            math_ops.greater(array_ops.size(string_tensor), 0),
265            [not_null_err])]):
266      string_tensor = array_ops.identity(string_tensor)
267    return input_producer(
268        input_tensor=string_tensor,
269        element_shape=[],
270        num_epochs=num_epochs,
271        shuffle=shuffle,
272        seed=seed,
273        capacity=capacity,
274        shared_name=shared_name,
275        name=name,
276        summary_name="fraction_of_%d_full" % capacity,
277        cancel_op=cancel_op)
278
279
280@tf_export(v1=["train.range_input_producer"])
281@deprecation.deprecated(
282    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
283    "`tf.data.Dataset.range(limit).shuffle(limit).repeat(num_epochs)`. If "
284    "`shuffle=False`, omit the `.shuffle(...)`.")
285def range_input_producer(limit, num_epochs=None, shuffle=True, seed=None,
286                         capacity=32, shared_name=None, name=None):
287  """Produces the integers from 0 to limit-1 in a queue.
288
289  Note: if `num_epochs` is not `None`, this function creates local counter
290  `epochs`. Use `local_variables_initializer()` to initialize local variables.
291
292  Args:
293    limit: An int32 scalar tensor.
294    num_epochs: An integer (optional). If specified, `range_input_producer`
295      produces each integer `num_epochs` times before generating an
296      OutOfRange error. If not specified, `range_input_producer` can cycle
297      through the integers an unlimited number of times.
298    shuffle: Boolean. If true, the integers are randomly shuffled within each
299      epoch.
300    seed: An integer (optional). Seed used if shuffle == True.
301    capacity: An integer. Sets the queue capacity.
302    shared_name: (optional). If set, this queue will be shared under the given
303      name across multiple sessions.
304    name: A name for the operations (optional).
305
306  Returns:
307    A Queue with the output integers.  A `QueueRunner` for the Queue
308    is added to the current `Graph`'s `QUEUE_RUNNER` collection.
309
310  @compatibility(eager)
311  Input pipelines based on Queues are not supported when eager execution is
312  enabled. Please use the `tf.data` API to ingest data under eager execution.
313  @end_compatibility
314  """
315  with ops.name_scope(name, "input_producer", [limit]) as name:
316    range_tensor = math_ops.range(limit)
317    return input_producer(
318        range_tensor, [], num_epochs, shuffle, seed, capacity,
319        shared_name, "fraction_of_%d_full" % capacity, name)
320
321
322@tf_export(v1=["train.slice_input_producer"])
323@deprecation.deprecated(
324    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
325    "`tf.data.Dataset.from_tensor_slices(tuple(tensor_list)).shuffle"
326    "(tf.shape(input_tensor, out_type=tf.int64)[0]).repeat(num_epochs)`. If "
327    "`shuffle=False`, omit the `.shuffle(...)`.")
328def slice_input_producer(tensor_list, num_epochs=None, shuffle=True, seed=None,
329                         capacity=32, shared_name=None, name=None):
330  """Produces a slice of each `Tensor` in `tensor_list`.
331
332  Implemented using a Queue -- a `QueueRunner` for the Queue
333  is added to the current `Graph`'s `QUEUE_RUNNER` collection.
334
335  Args:
336    tensor_list: A list of `Tensor` objects. Every `Tensor` in
337      `tensor_list` must have the same size in the first dimension.
338    num_epochs: An integer (optional). If specified, `slice_input_producer`
339      produces each slice `num_epochs` times before generating
340      an `OutOfRange` error. If not specified, `slice_input_producer` can cycle
341      through the slices an unlimited number of times.
342    shuffle: Boolean. If true, the integers are randomly shuffled within each
343      epoch.
344    seed: An integer (optional). Seed used if shuffle == True.
345    capacity: An integer. Sets the queue capacity.
346    shared_name: (optional). If set, this queue will be shared under the given
347      name across multiple sessions.
348    name: A name for the operations (optional).
349
350  Returns:
351    A list of tensors, one for each element of `tensor_list`.  If the tensor
352    in `tensor_list` has shape `[N, a, b, .., z]`, then the corresponding output
353    tensor will have shape `[a, b, ..., z]`.
354
355  Raises:
356    ValueError: if `slice_input_producer` produces nothing from `tensor_list`.
357
358  @compatibility(eager)
359  Input pipelines based on Queues are not supported when eager execution is
360  enabled. Please use the `tf.data` API to ingest data under eager execution.
361  @end_compatibility
362  """
363  with ops.name_scope(name, "input_producer", tensor_list):
364    tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
365    if not tensor_list:
366      raise ValueError(
367          "Expected at least one tensor in slice_input_producer().")
368    range_size = array_ops.shape(tensor_list[0])[0]
369    # TODO(josh11b): Add an assertion that the first dimension of
370    # everything in TensorList matches. Maybe just check the inferred shapes?
371    queue = range_input_producer(range_size, num_epochs=num_epochs,
372                                 shuffle=shuffle, seed=seed, capacity=capacity,
373                                 shared_name=shared_name)
374    index = queue.dequeue()
375    output = [array_ops.gather(t, index) for t in tensor_list]
376    return output
377
378
379# Helpers for the batching functions ------------------------------------------
380
381
382def _flatten(tensor_list_list):
383  return [tensor for tensor_list in tensor_list_list for tensor in tensor_list]
384
385
386class _SparseMetaData(object):
387  """Store information about the Tensor: Is it sparse?, map_op, and rank."""
388
389  def __init__(self, sparse, map_op, rank):
390    """Create the metadata.
391
392    Args:
393      sparse: Python boolean.
394      map_op: The `Operation` that created the `SparseTensorsMap` in question.
395        This Op contains information about the underlying Map object and the
396        dtype of the original data.
397      rank: The statically known rank of the `SparseTensor`.
398    """
399    self._sparse = sparse
400    self._map_op = map_op
401    self._rank = tensor_shape.as_dimension(rank)
402
403  def __eq__(self, other):
404    if self.sparse != other.sparse:
405      return False
406    if not self.sparse:
407      return True
408    # If map_ops are not the same, the data source is not the same.
409    if (self.map_op is not None) != (other.map_op is not None):
410      return False
411    if self.map_op != other.map_op:
412      return False
413    if not self.rank.is_compatible_with(other.rank):
414      return False
415    return True
416
417  def __ne__(self, other):
418    return not self.__eq__(other)
419
420  def __str__(self):
421    return "[SparseMetaData(%s, %s, %s)]" % (self.sparse, self.map_op.name,
422                                             self.rank)
423
424  def merge_with(self, other):
425    if self != other:
426      raise ValueError("SparseMetaData objects are incompatible: %s vs. %s"
427                       % (self, other))
428    if self.sparse:
429      self.rank.merge_with(other.rank)
430    return self
431
432  @property
433  def map_op(self):
434    return self._map_op
435
436  @property
437  def sparse(self):
438    return self._sparse
439
440  @property
441  def rank(self):
442    return self._rank
443
444
445def _as_tensor_list(tensors):
446  if isinstance(tensors, dict):
447    return [tensors[k] for k in sorted(tensors, key=str)]
448  else:
449    return tensors
450
451
452def _as_tensor_list_list(tensors_list):
453  if not tensors_list:
454    raise ValueError("Expected at least one set of tensors")
455  if isinstance(tensors_list[0], dict):
456    expected_keys = set(tensors_list[0].keys())
457    for tensors in tensors_list[1:]:
458      if set(tensors.keys()) != expected_keys:
459        raise ValueError("All dictionaries in tensors_list must have "
460                         "the same keys")
461    return [_as_tensor_list(tensors) for tensors in tensors_list]
462  else:
463    return tensors_list
464
465
466def _as_original_type(original_tensors, tensor_list):
467  if isinstance(original_tensors, dict):
468    if len(original_tensors) == 1:
469      # tensor_list is bogusly returned as a single tensor if only one tensor
470      # was enqueued.  Make it a list again.  See b/28117485.
471      tensor_list = [tensor_list]
472    return {k: tensor_list[i]
473            for i, k in enumerate(sorted(original_tensors, key=str))}
474  else:
475    return tensor_list
476
477
478def _store_sparse_tensors(tensor_list, enqueue_many, keep_input,
479                          shared_map_ops=None):
480  """Store SparseTensors for feeding into batch, etc.
481
482  If `shared_map_ops` is provided, the underlying `SparseTensorsMap` objects
483  are reused (shared).  This argument is useful for, e.g., `batch_join`
484  where multiple enqueue operations write to the same Queue component,
485  and another (dequeue) thread reads from that same location and must then
486  restore the associated `SparseTensor` objects.  In this case, the sparse
487  restore must have a single `SparseTensorMap` from which to read out the
488  handles; so a single `SparseTensorMap` must be shared for storing
489  across the multiple enqueue operations.  This sharing is performed by
490  calling `_store_sparse_tensors` the first time with `shared_map_ops=None`,
491  and then in subsequent times with this value set to the list of `Operation`
492  objects created in the first call.
493
494  Args:
495    tensor_list: List of `Tensor` and `SparseTensor` objects.
496    enqueue_many: Python `Boolean`.
497    keep_input: Must be a scalar bool Tensor (not a Python bool). If False,
498      don't store.
499    shared_map_ops: (optional) List of `Operation` objects from a previous
500      call to `_store_sparse_tensors`.  If not `None`, the op types should be
501      one of `AddSparseToTensorsMap` or `AddManySparseToTensorsMap` in the
502      locations corresponding to `SparseTensors` in `tensor_list`.
503
504  Returns:
505    A tuple `(stored_list, sparse_info_list)` where `stored_list` is a list
506    of `Tensor` objects (same length as `tensor_list`) and `sparse_info_list`
507    is a list of the same length of `_SparseMetaData` objects.
508  """
509  maybe_shared_map_ops = shared_map_ops or [None] * len(tensor_list)
510
511  def _sparse_meta_data(t, storing_op, map_op):
512    if not isinstance(t, sparse_tensor.SparseTensor):
513      return _SparseMetaData(False, None, None)
514    rank = t.dense_shape.shape.with_rank(1).dims[0]
515    if enqueue_many:
516      rank -= 1
517    # If a shared map_op was provided, use that. Otherwise use the name of
518    # the operation used to store the SparseTensor.
519    return _SparseMetaData(
520        sparse=True, map_op=map_op or storing_op, rank=rank)
521
522  def _maybe_store(t, shared_map_op):
523    """Store Sparse tensor, if necessary."""
524    if not isinstance(t, sparse_tensor.SparseTensor):
525      return t
526    map_op_name = shared_map_op.name if shared_map_op else None
527    def _maybe_store_sparse(t, map_op_name, keep_input):
528      """Conditionally store a single sparse Tensor."""
529      return utils.smart_cond(
530          keep_input,
531          lambda: _store_sparse(t, shared_name=map_op_name),
532          lambda: constant_op.constant(-1, dtypes.int64))
533    def _maybe_store_many_sparse(t, map_op_name, keep_input):
534      """Conditionally store multiple sparse Tensors."""
535      out_tensor = utils.smart_cond(
536          keep_input,
537          lambda: _store_many_sparse(t, shared_name=map_op_name),
538          lambda: -1 * array_ops.ones(array_ops.shape(t)[0:1], dtypes.int64))
539      out_tensor.set_shape([None])  # necessary when t.ndims is unknown
540      return out_tensor
541    def _sparse_values_to_keep(t, keep_input):
542      """Convert a per-row `keep_input` vector to a per-value one."""
543      # Get the rows of every value in the sparse Tensor.
544      row_values = t.indices[:, 0]
545      # The value should be kept iff the row should be kept.
546      return array_ops.gather(keep_input, row_values)
547    if keep_input.shape.ndims == 1:
548      t = sparse_ops.sparse_retain(t, _sparse_values_to_keep(t, keep_input))
549      store_f = lambda t, name, _: _store_many_sparse(t, shared_name=name)
550    elif enqueue_many:
551      store_f = _maybe_store_many_sparse
552    else:
553      store_f = _maybe_store_sparse
554    return store_f(t, map_op_name, keep_input)
555
556  stored_list = [
557      _maybe_store(t, shared_map_op) for t, shared_map_op
558      in zip(tensor_list, maybe_shared_map_ops)]
559  # Since the output of `_store{_many}_sparse is wrapped in a tf.cond `Merge`,
560  # we can't just get the Op of the resulting tensor.
561  def _sparse_op(stored):
562    for input_tensor in stored.op.inputs:
563      if input_tensor.op.type in ("AddSparseToTensorsMap",
564                                  "AddManySparseToTensorsMap"):
565        return input_tensor.op
566    # If there was no sparse input, then the original stored Tensor wasn't
567    # sparse and we can just return the original Tensor's Op.
568    return stored.op
569  sparse_info_list = [
570      _sparse_meta_data(t, _sparse_op(stored), shared_map_op)
571      for t, stored, shared_map_op
572      in zip(tensor_list, stored_list, maybe_shared_map_ops)]
573  # Expand dims of stored tensors by 1 for proper enqueue shape
574  stored_list = [
575      array_ops.expand_dims(s, [-1]) if s_info.sparse else s
576      for s, s_info in zip(stored_list, sparse_info_list)]
577  return stored_list, sparse_info_list
578
579
580def _store_sparse_tensors_join(tensor_list_list, enqueue_many, keep_input):
581  """Store SparseTensors for feeding into batch_join, etc."""
582  (s0, sparse_info_list) = _store_sparse_tensors(
583      tensor_list_list[0], enqueue_many, keep_input)
584  stored_list_list = [s0]
585  for tensor_list in tensor_list_list[1:]:
586    s, sparse_info_candidate = _store_sparse_tensors(
587        tensor_list, enqueue_many, keep_input,
588        [st.map_op for st in sparse_info_list])
589    if sparse_info_list != sparse_info_candidate:
590      raise ValueError("Inconsistent SparseTensors list: %s vs. %s"
591                       % (tensor_list_list[0], tensor_list))
592    sparse_info_list = [
593        info.merge_with(candidate)
594        for (info, candidate) in zip(sparse_info_list, sparse_info_candidate)]
595    stored_list_list.append(s)
596
597  return (stored_list_list, sparse_info_list)
598
599
600def _restore_sparse_tensors(stored_list, sparse_info_list):
601  """Restore SparseTensors after dequeue in batch, batch_join, etc."""
602  received_sequence = isinstance(stored_list, collections_abc.Sequence)
603  if not received_sequence:
604    stored_list = (stored_list,)
605  tensors = [
606      _restore_sparse(sparse_map_op=info.map_op,
607                      sparse_handles=array_ops.squeeze(s, [1]),
608                      rank=tensor_shape.dimension_value(info.rank + 1))
609      if info.sparse else s
610      for (s, info) in zip(stored_list, sparse_info_list)]
611  has_st = any(isinstance(x, sparse_tensor.SparseTensor) for x in tensors)
612  if has_st:
613    t_values = [
614        x.values if isinstance(x, sparse_tensor.SparseTensor)
615        else x
616        for x in tensors]
617    with_deps = lambda x: control_flow_ops.with_dependencies(t_values, x)
618    ensure_restore_tensors = [
619        sparse_tensor.SparseTensor(indices=with_deps(x.indices),
620                                   values=with_deps(x.values),
621                                   dense_shape=with_deps(x.dense_shape))
622        if isinstance(x, sparse_tensor.SparseTensor)
623        else with_deps(x)
624        for x in tensors]
625  else:
626    ensure_restore_tensors = tensors
627  return ensure_restore_tensors if received_sequence else tensors[0]
628
629
630def _validate(tensor_list):
631  tensor_list = ops.convert_n_to_tensor_or_indexed_slices(tensor_list)
632  if not tensor_list:
633    raise ValueError("Expected at least one tensor in batch().")
634  return tensor_list
635
636
637def _validate_join(tensor_list_list):
638  tensor_list_list = [ops.convert_n_to_tensor_or_indexed_slices(tl)
639                      for tl in tensor_list_list]
640  if not tensor_list_list:
641    raise ValueError("Expected at least one input in batch_join().")
642  return tensor_list_list
643
644
645def _validate_keep_input(keep_input, enqueue_many):
646  """Validate `keep_input` argument to conditional batching functions."""
647  keep_input = ops.convert_to_tensor(keep_input)
648  if keep_input.shape.ndims is None:
649    raise ValueError(
650        "`keep_input` dimensions must be known at graph construction.")
651  if not enqueue_many and keep_input.shape.ndims == 1:
652    raise ValueError(
653        "`keep_input` cannot be a vector when `enqueue_many=False`.")
654  if keep_input.shape.ndims > 1:
655    raise ValueError("`keep_input` must be 0 or 1 dimensions.")
656  return keep_input
657
658
659def _dtypes(tensor_list_list):
660  all_types = [[t.dtype for t in tl] for tl in tensor_list_list]
661  types = all_types[0]
662  for other_types in all_types[1:]:
663    if other_types != types:
664      raise TypeError("Expected types to be consistent: %s vs. %s." %
665                      (", ".join(x.name for x in types),
666                       ", ".join(x.name for x in other_types)))
667  return types
668
669
670def _merge_shapes(shape_list, enqueue_many):
671  shape_list = [tensor_shape.as_shape(s) for s in shape_list]
672  if enqueue_many:
673    # We want the shapes without the leading batch dimension.
674    shape_list = [s.with_rank_at_least(1)[1:] for s in shape_list]
675  merged_shape = shape_list[0]
676  for s in shape_list[1:]:
677    merged_shape.merge_with(s)
678  return merged_shape.as_list()
679
680
681def _shapes(tensor_list_list, shapes, enqueue_many):
682  """Calculate and merge the shapes of incoming tensors.
683
684  Args:
685    tensor_list_list: List of tensor lists.
686    shapes: List of shape tuples corresponding to tensors within the lists.
687    enqueue_many: Boolean describing whether shapes will be enqueued as
688      batches or individual entries.
689
690  Returns:
691    A list of shapes aggregating shape inference info from `tensor_list_list`,
692    or returning `shapes` if it is not `None`.
693
694  Raises:
695    ValueError: If any of the inferred shapes in `tensor_list_list` lack a
696      well defined rank.
697  """
698  if shapes is None:
699    len0 = len(tensor_list_list[0])
700
701    for tl in tensor_list_list:
702      for i in xrange(len0):
703        if tl[i].shape.ndims is None:
704          raise ValueError("Cannot infer Tensor's rank: %s" % tl[i])
705
706    shapes = [_merge_shapes(
707        [tl[i].shape.as_list() for tl in tensor_list_list], enqueue_many)
708              for i in xrange(len0)]
709  return shapes
710
711
712def _select_which_to_enqueue(tensor_list, keep_input):
713  """Select which examples to enqueue based on vector `keep_input`."""
714  select_i = math_ops.cast(keep_input, dtypes.int32)
715  tensor_list = [
716      data_flow_ops.dynamic_partition(x, select_i, num_partitions=2)[1]
717      for x in tensor_list]
718  return tensor_list
719
720
721def _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input):
722  """Enqueue `tensor_list_list` in `queue`."""
723  if enqueue_many:
724    enqueue_fn = queue.enqueue_many
725  else:
726    enqueue_fn = queue.enqueue
727  if keep_input.shape.ndims == 1:
728    enqueue_ops = [enqueue_fn(_select_which_to_enqueue(x, keep_input))
729                   for x in tensor_list_list]
730  else:
731    enqueue_ops = [utils.smart_cond(
732        keep_input,
733        lambda: enqueue_fn(tl),  # pylint:disable=cell-var-from-loop
734        control_flow_ops.no_op) for tl in tensor_list_list]
735  queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
736
737
738def _enqueue(queue, tensor_list, threads, enqueue_many, keep_input):
739  """Enqueue `tensor_list` in `queue`."""
740  if enqueue_many:
741    enqueue_fn = queue.enqueue_many
742  else:
743    enqueue_fn = queue.enqueue
744  if keep_input.shape.ndims == 1:
745    enqueue_ops = [
746        enqueue_fn(_select_which_to_enqueue(tensor_list, keep_input))] * threads
747  else:
748    enqueue_ops = [utils.smart_cond(
749        keep_input,
750        lambda: enqueue_fn(tensor_list),
751        control_flow_ops.no_op)] * threads
752  queue_runner.add_queue_runner(queue_runner.QueueRunner(queue, enqueue_ops))
753
754
755def _which_queue(dynamic_pad):
756  return (data_flow_ops.PaddingFIFOQueue if dynamic_pad
757          else data_flow_ops.FIFOQueue)
758
759
760def _batch(tensors, batch_size, keep_input, num_threads=1, capacity=32,
761           enqueue_many=False, shapes=None, dynamic_pad=False,
762           allow_smaller_final_batch=False, shared_name=None,
763           name=None):
764  """Helper function for `batch` and `maybe_batch`."""
765  if context.executing_eagerly():
766    raise ValueError(
767        "Input pipelines based on Queues are not supported when eager execution"
768        " is enabled. Please use tf.data to ingest data into your model"
769        " instead.")
770  tensor_list = _as_tensor_list(tensors)
771  with ops.name_scope(name, "batch", list(tensor_list) + [keep_input]) as name:
772    tensor_list = _validate(tensor_list)
773    keep_input = _validate_keep_input(keep_input, enqueue_many)
774    (tensor_list, sparse_info) = _store_sparse_tensors(
775        tensor_list, enqueue_many, keep_input)
776    types = _dtypes([tensor_list])
777    shapes = _shapes([tensor_list], shapes, enqueue_many)
778    # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
779    queue = _which_queue(dynamic_pad)(
780        capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
781    _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input)
782    summary.scalar(
783        "fraction_of_%d_full" % capacity,
784        math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity))
785
786    if allow_smaller_final_batch:
787      dequeued = queue.dequeue_up_to(batch_size, name=name)
788    else:
789      dequeued = queue.dequeue_many(batch_size, name=name)
790    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
791    return _as_original_type(tensors, dequeued)
792
793
794# TODO(josh11b): Add a thread_multiplier or num_threads (that has to be
795# a multiple of len(tensor_list_list)?) parameter, to address the use
796# case where you want more parallelism than you can support different
797# readers (either because you don't have that many files or can't
798# read that many files in parallel due to the number of seeks required).
799# Once this is done, batch() can be written as a call to batch_join().
800def _batch_join(tensors_list, batch_size, keep_input, capacity=32,
801                enqueue_many=False, shapes=None, dynamic_pad=False,
802                allow_smaller_final_batch=False, shared_name=None, name=None):
803  """Helper function for `batch_join` and `maybe_batch_join`."""
804  if context.executing_eagerly():
805    raise ValueError(
806        "Input pipelines based on Queues are not supported when eager execution"
807        " is enabled. Please use tf.data to ingest data into your model"
808        " instead.")
809  tensor_list_list = _as_tensor_list_list(tensors_list)
810  with ops.name_scope(name, "batch_join",
811                      _flatten(tensor_list_list) + [keep_input]) as name:
812    tensor_list_list = _validate_join(tensor_list_list)
813    keep_input = _validate_keep_input(keep_input, enqueue_many)
814    tensor_list_list, sparse_info = _store_sparse_tensors_join(
815        tensor_list_list, enqueue_many, keep_input)
816    types = _dtypes(tensor_list_list)
817    shapes = _shapes(tensor_list_list, shapes, enqueue_many)
818    # TODO(josh11b,mrry): Switch to BatchQueue once it is written.
819    queue = _which_queue(dynamic_pad)(
820        capacity=capacity, dtypes=types, shapes=shapes, shared_name=shared_name)
821    _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input)
822    summary.scalar(
823        "fraction_of_%d_full" % capacity,
824        math_ops.cast(queue.size(), dtypes.float32) * (1. / capacity))
825
826    if allow_smaller_final_batch:
827      dequeued = queue.dequeue_up_to(batch_size, name=name)
828    else:
829      dequeued = queue.dequeue_many(batch_size, name=name)
830    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
831    # tensors_list was validated to not be empty.
832    return _as_original_type(tensors_list[0], dequeued)
833
834
835def _shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
836                   keep_input, num_threads=1, seed=None, enqueue_many=False,
837                   shapes=None, allow_smaller_final_batch=False,
838                   shared_name=None, name=None):
839  """Helper function for `shuffle_batch` and `maybe_shuffle_batch`."""
840  if context.executing_eagerly():
841    raise ValueError(
842        "Input pipelines based on Queues are not supported when eager execution"
843        " is enabled. Please use tf.data to ingest data into your model"
844        " instead.")
845  tensor_list = _as_tensor_list(tensors)
846  with ops.name_scope(name, "shuffle_batch",
847                      list(tensor_list) + [keep_input]) as name:
848    if capacity <= min_after_dequeue:
849      raise ValueError("capacity %d must be bigger than min_after_dequeue %d."
850                       % (capacity, min_after_dequeue))
851    tensor_list = _validate(tensor_list)
852    keep_input = _validate_keep_input(keep_input, enqueue_many)
853    tensor_list, sparse_info = _store_sparse_tensors(
854        tensor_list, enqueue_many, keep_input)
855    types = _dtypes([tensor_list])
856    shapes = _shapes([tensor_list], shapes, enqueue_many)
857    queue = data_flow_ops.RandomShuffleQueue(
858        capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
859        dtypes=types, shapes=shapes, shared_name=shared_name)
860    _enqueue(queue, tensor_list, num_threads, enqueue_many, keep_input)
861    full = (math_ops.cast(
862        math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) *
863            (1. / (capacity - min_after_dequeue)))
864    # Note that name contains a '/' at the end so we intentionally do not place
865    # a '/' after %s below.
866    summary_name = (
867        "fraction_over_%d_of_%d_full" %
868        (min_after_dequeue, capacity - min_after_dequeue))
869    summary.scalar(summary_name, full)
870
871    if allow_smaller_final_batch:
872      dequeued = queue.dequeue_up_to(batch_size, name=name)
873    else:
874      dequeued = queue.dequeue_many(batch_size, name=name)
875    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
876    return _as_original_type(tensors, dequeued)
877
878
879def _shuffle_batch_join(tensors_list, batch_size, capacity,
880                        min_after_dequeue, keep_input, seed=None,
881                        enqueue_many=False, shapes=None,
882                        allow_smaller_final_batch=False, shared_name=None,
883                        name=None):
884  """Helper function for `shuffle_batch_join` and `maybe_shuffle_batch_join`."""
885  if context.executing_eagerly():
886    raise ValueError(
887        "Input pipelines based on Queues are not supported when eager execution"
888        " is enabled. Please use tf.data to ingest data into your model"
889        " instead.")
890  tensor_list_list = _as_tensor_list_list(tensors_list)
891  with ops.name_scope(name, "shuffle_batch_join",
892                      _flatten(tensor_list_list) + [keep_input]) as name:
893    tensor_list_list = _validate_join(tensor_list_list)
894    keep_input = _validate_keep_input(keep_input, enqueue_many)
895    tensor_list_list, sparse_info = _store_sparse_tensors_join(
896        tensor_list_list, enqueue_many, keep_input)
897    types = _dtypes(tensor_list_list)
898    shapes = _shapes(tensor_list_list, shapes, enqueue_many)
899    queue = data_flow_ops.RandomShuffleQueue(
900        capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
901        dtypes=types, shapes=shapes, shared_name=shared_name)
902    _enqueue_join(queue, tensor_list_list, enqueue_many, keep_input)
903    full = (math_ops.cast(
904        math_ops.maximum(0, queue.size() - min_after_dequeue), dtypes.float32) *
905            (1. / (capacity - min_after_dequeue)))
906    # Note that name contains a '/' at the end so we intentionally do not place
907    # a '/' after %s below.
908    summary_name = (
909        "fraction_over_%d_of_%d_full" %
910        (min_after_dequeue, capacity - min_after_dequeue))
911    summary.scalar(summary_name, full)
912
913    if allow_smaller_final_batch:
914      dequeued = queue.dequeue_up_to(batch_size, name=name)
915    else:
916      dequeued = queue.dequeue_many(batch_size, name=name)
917    dequeued = _restore_sparse_tensors(dequeued, sparse_info)
918    # tensors_list was validated to not be empty.
919    return _as_original_type(tensors_list[0], dequeued)
920
921# Batching functions ----------------------------------------------------------
922
923
924@tf_export(v1=["train.batch"])
925@deprecation.deprecated(
926    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
927    "`tf.data.Dataset.batch(batch_size)` (or `padded_batch(...)` if "
928    "`dynamic_pad=True`).")
929def batch(tensors, batch_size, num_threads=1, capacity=32,
930          enqueue_many=False, shapes=None, dynamic_pad=False,
931          allow_smaller_final_batch=False, shared_name=None, name=None):
932  """Creates batches of tensors in `tensors`.
933
934  The argument `tensors` can be a list or a dictionary of tensors.
935  The value returned by the function will be of the same type
936  as `tensors`.
937
938  This function is implemented using a queue. A `QueueRunner` for the
939  queue is added to the current `Graph`'s `QUEUE_RUNNER` collection.
940
941  If `enqueue_many` is `False`, `tensors` is assumed to represent a single
942  example.  An input tensor with shape `[x, y, z]` will be output as a tensor
943  with shape `[batch_size, x, y, z]`.
944
945  If `enqueue_many` is `True`, `tensors` is assumed to represent a batch of
946  examples, where the first dimension is indexed by example, and all members of
947  `tensors` should have the same size in the first dimension.  If an input
948  tensor has shape `[*, x, y, z]`, the output will have shape `[batch_size, x,
949  y, z]`.  The `capacity` argument controls the how long the prefetching is
950  allowed to grow the queues.
951
952  The returned operation is a dequeue operation and will throw
953  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
954  operation is feeding another input queue, its queue runner will catch
955  this exception, however, if this operation is used in your main thread
956  you are responsible for catching this yourself.
957
958  *N.B.:* If `dynamic_pad` is `False`, you must ensure that either
959  (i) the `shapes` argument is passed, or (ii) all of the tensors in
960  `tensors` must have fully-defined shapes. `ValueError` will be
961  raised if neither of these conditions holds.
962
963  If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
964  tensors is known, but individual dimensions may have shape `None`.
965  In this case, for each enqueue the dimensions with value `None`
966  may have a variable length; upon dequeue, the output tensors will be padded
967  on the right to the maximum shape of the tensors in the current minibatch.
968  For numbers, this padding takes value 0.  For strings, this padding is
969  the empty string.  See `PaddingFIFOQueue` for more info.
970
971  If `allow_smaller_final_batch` is `True`, a smaller batch value than
972  `batch_size` is returned when the queue is closed and there are not enough
973  elements to fill the batch, otherwise the pending elements are discarded.
974  In addition, all output tensors' static shapes, as accessed via the
975  `shape` property will have a first `Dimension` value of `None`, and
976  operations that depend on fixed batch_size would fail.
977
978  Args:
979    tensors: The list or dictionary of tensors to enqueue.
980    batch_size: The new batch size pulled from the queue.
981    num_threads: The number of threads enqueuing `tensors`.  The batching will
982      be nondeterministic if `num_threads > 1`.
983    capacity: An integer. The maximum number of elements in the queue.
984    enqueue_many: Whether each tensor in `tensors` is a single example.
985    shapes: (Optional) The shapes for each example.  Defaults to the
986      inferred shapes for `tensors`.
987    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
988      The given dimensions are padded upon dequeue so that tensors within a
989      batch have the same shapes.
990    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
991      batch to be smaller if there are insufficient items left in the queue.
992    shared_name: (Optional). If set, this queue will be shared under the given
993      name across multiple sessions.
994    name: (Optional) A name for the operations.
995
996  Returns:
997    A list or dictionary of tensors with the same types as `tensors` (except if
998    the input is a list of one element, then it returns a tensor, not a list).
999
1000  Raises:
1001    ValueError: If the `shapes` are not specified, and cannot be
1002      inferred from the elements of `tensors`.
1003
1004  @compatibility(eager)
1005  Input pipelines based on Queues are not supported when eager execution is
1006  enabled. Please use the `tf.data` API to ingest data under eager execution.
1007  @end_compatibility
1008  """
1009  return _batch(
1010      tensors,
1011      batch_size,
1012      keep_input=True,
1013      num_threads=num_threads,
1014      capacity=capacity,
1015      enqueue_many=enqueue_many,
1016      shapes=shapes,
1017      dynamic_pad=dynamic_pad,
1018      allow_smaller_final_batch=allow_smaller_final_batch,
1019      shared_name=shared_name,
1020      name=name)
1021
1022
1023@tf_export(v1=["train.maybe_batch"])
1024@deprecation.deprecated(
1025    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1026    "`tf.data.Dataset.filter(...).batch(batch_size)` (or `padded_batch(...)`"
1027    " if `dynamic_pad=True`).")
1028def maybe_batch(tensors, keep_input, batch_size, num_threads=1, capacity=32,
1029                enqueue_many=False, shapes=None, dynamic_pad=False,
1030                allow_smaller_final_batch=False, shared_name=None, name=None):
1031  """Conditionally creates batches of tensors based on `keep_input`.
1032
1033  See docstring in `batch` for more details.
1034
1035  Args:
1036    tensors: The list or dictionary of tensors to enqueue.
1037    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1038      added to the queue or not.  If it is a scalar and evaluates `True`, then
1039      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1040      is `True`, then each example is added to the queue only if the
1041      corresponding value in `keep_input` is `True`. This tensor essentially
1042      acts as a filtering mechanism.
1043    batch_size: The new batch size pulled from the queue.
1044    num_threads: The number of threads enqueuing `tensors`.  The batching will
1045      be nondeterministic if `num_threads > 1`.
1046    capacity: An integer. The maximum number of elements in the queue.
1047    enqueue_many: Whether each tensor in `tensors` is a single example.
1048    shapes: (Optional) The shapes for each example.  Defaults to the
1049      inferred shapes for `tensors`.
1050    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1051      The given dimensions are padded upon dequeue so that tensors within a
1052      batch have the same shapes.
1053    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1054      batch to be smaller if there are insufficient items left in the queue.
1055    shared_name: (Optional). If set, this queue will be shared under the given
1056      name across multiple sessions.
1057    name: (Optional) A name for the operations.
1058
1059  Returns:
1060    A list or dictionary of tensors with the same types as `tensors`.
1061
1062  Raises:
1063    ValueError: If the `shapes` are not specified, and cannot be
1064      inferred from the elements of `tensors`.
1065  """
1066  return _batch(
1067      tensors,
1068      batch_size,
1069      keep_input,
1070      num_threads=num_threads,
1071      capacity=capacity,
1072      enqueue_many=enqueue_many,
1073      shapes=shapes,
1074      dynamic_pad=dynamic_pad,
1075      allow_smaller_final_batch=allow_smaller_final_batch,
1076      shared_name=shared_name,
1077      name=name)
1078
1079
1080@tf_export(v1=["train.batch_join"])
1081@deprecation.deprecated(
1082    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1083    "`tf.data.Dataset.interleave(...).batch(batch_size)` (or "
1084    "`padded_batch(...)` if `dynamic_pad=True`).")
1085def batch_join(tensors_list, batch_size, capacity=32, enqueue_many=False,
1086               shapes=None, dynamic_pad=False, allow_smaller_final_batch=False,
1087               shared_name=None, name=None):
1088  """Runs a list of tensors to fill a queue to create batches of examples.
1089
1090  The `tensors_list` argument is a list of tuples of tensors, or a list of
1091  dictionaries of tensors.  Each element in the list is treated similarly
1092  to the `tensors` argument of `tf.compat.v1.train.batch()`.
1093
1094  WARNING: This function is nondeterministic, since it starts a separate thread
1095  for each tensor.
1096
1097  Enqueues a different list of tensors in different threads.
1098  Implemented using a queue -- a `QueueRunner` for the queue
1099  is added to the current `Graph`'s `QUEUE_RUNNER` collection.
1100
1101  `len(tensors_list)` threads will be started,
1102  with thread `i` enqueuing the tensors from
1103  `tensors_list[i]`. `tensors_list[i1][j]` must match
1104  `tensors_list[i2][j]` in type and shape, except in the first
1105  dimension if `enqueue_many` is true.
1106
1107  If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
1108  to represent a single example. An input tensor `x` will be output as a
1109  tensor with shape `[batch_size] + x.shape`.
1110
1111  If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
1112  represent a batch of examples, where the first dimension is indexed
1113  by example, and all members of `tensors_list[i]` should have the
1114  same size in the first dimension.  The slices of any input tensor
1115  `x` are treated as examples, and the output tensors will have shape
1116  `[batch_size] + x.shape[1:]`.
1117
1118  The `capacity` argument controls the how long the prefetching is allowed to
1119  grow the queues.
1120
1121  The returned operation is a dequeue operation and will throw
1122  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1123  operation is feeding another input queue, its queue runner will catch
1124  this exception, however, if this operation is used in your main thread
1125  you are responsible for catching this yourself.
1126
1127  *N.B.:* If `dynamic_pad` is `False`, you must ensure that either
1128  (i) the `shapes` argument is passed, or (ii) all of the tensors in
1129  `tensors_list` must have fully-defined shapes. `ValueError` will be
1130  raised if neither of these conditions holds.
1131
1132  If `dynamic_pad` is `True`, it is sufficient that the *rank* of the
1133  tensors is known, but individual dimensions may have value `None`.
1134  In this case, for each enqueue the dimensions with value `None`
1135  may have a variable length; upon dequeue, the output tensors will be padded
1136  on the right to the maximum shape of the tensors in the current minibatch.
1137  For numbers, this padding takes value 0.  For strings, this padding is
1138  the empty string.  See `PaddingFIFOQueue` for more info.
1139
1140  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1141  `batch_size` is returned when the queue is closed and there are not enough
1142  elements to fill the batch, otherwise the pending elements are discarded.
1143  In addition, all output tensors' static shapes, as accessed via the
1144  `shape` property will have a first `Dimension` value of `None`, and
1145  operations that depend on fixed batch_size would fail.
1146
1147  Args:
1148    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1149    batch_size: An integer. The new batch size pulled from the queue.
1150    capacity: An integer. The maximum number of elements in the queue.
1151    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1152      example.
1153    shapes: (Optional) The shapes for each example.  Defaults to the
1154      inferred shapes for `tensor_list_list[i]`.
1155    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1156      The given dimensions are padded upon dequeue so that tensors within a
1157      batch have the same shapes.
1158    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1159      batch to be smaller if there are insufficient items left in the queue.
1160    shared_name: (Optional) If set, this queue will be shared under the given
1161      name across multiple sessions.
1162    name: (Optional) A name for the operations.
1163
1164  Returns:
1165    A list or dictionary of tensors with the same number and types as
1166    `tensors_list[i]`.
1167
1168  Raises:
1169    ValueError: If the `shapes` are not specified, and cannot be
1170      inferred from the elements of `tensor_list_list`.
1171
1172  @compatibility(eager)
1173  Input pipelines based on Queues are not supported when eager execution is
1174  enabled. Please use the `tf.data` API to ingest data under eager execution.
1175  @end_compatibility
1176  """
1177  return _batch_join(
1178      tensors_list,
1179      batch_size,
1180      keep_input=True,
1181      capacity=capacity,
1182      enqueue_many=enqueue_many,
1183      shapes=shapes,
1184      dynamic_pad=dynamic_pad,
1185      allow_smaller_final_batch=allow_smaller_final_batch,
1186      shared_name=shared_name,
1187      name=name)
1188
1189
1190@tf_export(v1=["train.maybe_batch_join"])
1191@deprecation.deprecated(
1192    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1193    "`tf.data.Dataset.interleave(...).filter(...).batch(batch_size)` (or "
1194    "`padded_batch(...)` if `dynamic_pad=True`).")
1195def maybe_batch_join(tensors_list, keep_input, batch_size, capacity=32,
1196                     enqueue_many=False, shapes=None, dynamic_pad=False,
1197                     allow_smaller_final_batch=False, shared_name=None,
1198                     name=None):
1199  """Runs a list of tensors to conditionally fill a queue to create batches.
1200
1201  See docstring in `batch_join` for more details.
1202
1203  Args:
1204    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1205    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1206      added to the queue or not.  If it is a scalar and evaluates `True`, then
1207      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1208      is `True`, then each example is added to the queue only if the
1209      corresponding value in `keep_input` is `True`. This tensor essentially
1210      acts as a filtering mechanism.
1211    batch_size: An integer. The new batch size pulled from the queue.
1212    capacity: An integer. The maximum number of elements in the queue.
1213    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1214      example.
1215    shapes: (Optional) The shapes for each example.  Defaults to the
1216      inferred shapes for `tensor_list_list[i]`.
1217    dynamic_pad: Boolean.  Allow variable dimensions in input shapes.
1218      The given dimensions are padded upon dequeue so that tensors within a
1219      batch have the same shapes.
1220    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1221      batch to be smaller if there are insufficient items left in the queue.
1222    shared_name: (Optional) If set, this queue will be shared under the given
1223      name across multiple sessions.
1224    name: (Optional) A name for the operations.
1225
1226  Returns:
1227    A list or dictionary of tensors with the same number and types as
1228    `tensors_list[i]`.
1229
1230  Raises:
1231    ValueError: If the `shapes` are not specified, and cannot be
1232      inferred from the elements of `tensor_list_list`.
1233  """
1234  return _batch_join(
1235      tensors_list,
1236      batch_size,
1237      keep_input,
1238      capacity=capacity,
1239      enqueue_many=enqueue_many,
1240      shapes=shapes,
1241      dynamic_pad=dynamic_pad,
1242      allow_smaller_final_batch=allow_smaller_final_batch,
1243      shared_name=shared_name,
1244      name=name)
1245
1246
1247@tf_export(v1=["train.shuffle_batch"])
1248@deprecation.deprecated(
1249    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1250    "`tf.data.Dataset.shuffle(min_after_dequeue).batch(batch_size)`.")
1251def shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
1252                  num_threads=1, seed=None, enqueue_many=False, shapes=None,
1253                  allow_smaller_final_batch=False, shared_name=None, name=None):
1254  """Creates batches by randomly shuffling tensors.
1255
1256  This function adds the following to the current `Graph`:
1257
1258  * A shuffling queue into which tensors from `tensors` are enqueued.
1259  * A `dequeue_many` operation to create batches from the queue.
1260  * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors
1261    from `tensors`.
1262
1263  If `enqueue_many` is `False`, `tensors` is assumed to represent a
1264  single example.  An input tensor with shape `[x, y, z]` will be output
1265  as a tensor with shape `[batch_size, x, y, z]`.
1266
1267  If `enqueue_many` is `True`, `tensors` is assumed to represent a
1268  batch of examples, where the first dimension is indexed by example,
1269  and all members of `tensors` should have the same size in the
1270  first dimension.  If an input tensor has shape `[*, x, y, z]`, the
1271  output will have shape `[batch_size, x, y, z]`.
1272
1273  The `capacity` argument controls the how long the prefetching is allowed to
1274  grow the queues.
1275
1276  The returned operation is a dequeue operation and will throw
1277  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1278  operation is feeding another input queue, its queue runner will catch
1279  this exception, however, if this operation is used in your main thread
1280  you are responsible for catching this yourself.
1281
1282  For example:
1283
1284  ```python
1285  # Creates batches of 32 images and 32 labels.
1286  image_batch, label_batch = tf.compat.v1.train.shuffle_batch(
1287        [single_image, single_label],
1288        batch_size=32,
1289        num_threads=4,
1290        capacity=50000,
1291        min_after_dequeue=10000)
1292  ```
1293
1294  *N.B.:* You must ensure that either (i) the `shapes` argument is
1295  passed, or (ii) all of the tensors in `tensors` must have
1296  fully-defined shapes. `ValueError` will be raised if neither of
1297  these conditions holds.
1298
1299  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1300  `batch_size` is returned when the queue is closed and there are not enough
1301  elements to fill the batch, otherwise the pending elements are discarded.
1302  In addition, all output tensors' static shapes, as accessed via the
1303  `shape` property will have a first `Dimension` value of `None`, and
1304  operations that depend on fixed batch_size would fail.
1305
1306  Args:
1307    tensors: The list or dictionary of tensors to enqueue.
1308    batch_size: The new batch size pulled from the queue.
1309    capacity: An integer. The maximum number of elements in the queue.
1310    min_after_dequeue: Minimum number elements in the queue after a
1311      dequeue, used to ensure a level of mixing of elements.
1312    num_threads: The number of threads enqueuing `tensor_list`.
1313    seed: Seed for the random shuffling within the queue.
1314    enqueue_many: Whether each tensor in `tensor_list` is a single example.
1315    shapes: (Optional) The shapes for each example.  Defaults to the
1316      inferred shapes for `tensor_list`.
1317    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1318      batch to be smaller if there are insufficient items left in the queue.
1319    shared_name: (Optional) If set, this queue will be shared under the given
1320      name across multiple sessions.
1321    name: (Optional) A name for the operations.
1322
1323  Returns:
1324    A list or dictionary of tensors with the types as `tensors`.
1325
1326  Raises:
1327    ValueError: If the `shapes` are not specified, and cannot be
1328      inferred from the elements of `tensors`.
1329
1330  @compatibility(eager)
1331  Input pipelines based on Queues are not supported when eager execution is
1332  enabled. Please use the `tf.data` API to ingest data under eager execution.
1333  @end_compatibility
1334  """
1335  return _shuffle_batch(
1336      tensors,
1337      batch_size,
1338      capacity,
1339      min_after_dequeue,
1340      keep_input=True,
1341      num_threads=num_threads,
1342      seed=seed,
1343      enqueue_many=enqueue_many,
1344      shapes=shapes,
1345      allow_smaller_final_batch=allow_smaller_final_batch,
1346      shared_name=shared_name,
1347      name=name)
1348
1349
1350@tf_export(v1=["train.maybe_shuffle_batch"])
1351@deprecation.deprecated(
1352    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1353    "`tf.data.Dataset.filter(...).shuffle(min_after_dequeue).batch(batch_size)`"
1354    ".")
1355def maybe_shuffle_batch(tensors, batch_size, capacity, min_after_dequeue,
1356                        keep_input, num_threads=1, seed=None,
1357                        enqueue_many=False, shapes=None,
1358                        allow_smaller_final_batch=False, shared_name=None,
1359                        name=None):
1360  """Creates batches by randomly shuffling conditionally-enqueued tensors.
1361
1362  See docstring in `shuffle_batch` for more details.
1363
1364  Args:
1365    tensors: The list or dictionary of tensors to enqueue.
1366    batch_size: The new batch size pulled from the queue.
1367    capacity: An integer. The maximum number of elements in the queue.
1368    min_after_dequeue: Minimum number elements in the queue after a
1369      dequeue, used to ensure a level of mixing of elements.
1370    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1371      added to the queue or not.  If it is a scalar and evaluates `True`, then
1372      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1373      is `True`, then each example is added to the queue only if the
1374      corresponding value in `keep_input` is `True`. This tensor essentially
1375      acts as a filtering mechanism.
1376    num_threads: The number of threads enqueuing `tensor_list`.
1377    seed: Seed for the random shuffling within the queue.
1378    enqueue_many: Whether each tensor in `tensor_list` is a single example.
1379    shapes: (Optional) The shapes for each example.  Defaults to the
1380      inferred shapes for `tensor_list`.
1381    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1382      batch to be smaller if there are insufficient items left in the queue.
1383    shared_name: (Optional) If set, this queue will be shared under the given
1384      name across multiple sessions.
1385    name: (Optional) A name for the operations.
1386
1387  Returns:
1388    A list or dictionary of tensors with the types as `tensors`.
1389
1390  Raises:
1391    ValueError: If the `shapes` are not specified, and cannot be
1392      inferred from the elements of `tensors`.
1393
1394  @compatibility(eager)
1395  Input pipelines based on Queues are not supported when eager execution is
1396  enabled. Please use the `tf.data` API to ingest data under eager execution.
1397  @end_compatibility
1398  """
1399  return _shuffle_batch(
1400      tensors,
1401      batch_size,
1402      capacity,
1403      min_after_dequeue,
1404      keep_input,
1405      num_threads=num_threads,
1406      seed=seed,
1407      enqueue_many=enqueue_many,
1408      shapes=shapes,
1409      allow_smaller_final_batch=allow_smaller_final_batch,
1410      shared_name=shared_name,
1411      name=name)
1412
1413
1414@tf_export(v1=["train.shuffle_batch_join"])
1415@deprecation.deprecated(
1416    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1417    "`tf.data.Dataset.interleave(...).shuffle(min_after_dequeue).batch"
1418    "(batch_size)`.")
1419def shuffle_batch_join(tensors_list, batch_size, capacity,
1420                       min_after_dequeue, seed=None, enqueue_many=False,
1421                       shapes=None, allow_smaller_final_batch=False,
1422                       shared_name=None, name=None):
1423  """Create batches by randomly shuffling tensors.
1424
1425  The `tensors_list` argument is a list of tuples of tensors, or a list of
1426  dictionaries of tensors.  Each element in the list is treated similarly
1427  to the `tensors` argument of `tf.compat.v1.train.shuffle_batch()`.
1428
1429  This version enqueues a different list of tensors in different threads.
1430  It adds the following to the current `Graph`:
1431
1432  * A shuffling queue into which tensors from `tensors_list` are enqueued.
1433  * A `dequeue_many` operation to create batches from the queue.
1434  * A `QueueRunner` to `QUEUE_RUNNER` collection, to enqueue the tensors
1435    from `tensors_list`.
1436
1437  `len(tensors_list)` threads will be started, with thread `i` enqueuing
1438  the tensors from `tensors_list[i]`. `tensors_list[i1][j]` must match
1439  `tensors_list[i2][j]` in type and shape, except in the first dimension if
1440  `enqueue_many` is true.
1441
1442  If `enqueue_many` is `False`, each `tensors_list[i]` is assumed
1443  to represent a single example.  An input tensor with shape `[x, y, z]`
1444  will be output as a tensor with shape `[batch_size, x, y, z]`.
1445
1446  If `enqueue_many` is `True`, `tensors_list[i]` is assumed to
1447  represent a batch of examples, where the first dimension is indexed
1448  by example, and all members of `tensors_list[i]` should have the
1449  same size in the first dimension.  If an input tensor has shape `[*, x,
1450  y, z]`, the output will have shape `[batch_size, x, y, z]`.
1451
1452  The `capacity` argument controls the how long the prefetching is allowed to
1453  grow the queues.
1454
1455  The returned operation is a dequeue operation and will throw
1456  `tf.errors.OutOfRangeError` if the input queue is exhausted. If this
1457  operation is feeding another input queue, its queue runner will catch
1458  this exception, however, if this operation is used in your main thread
1459  you are responsible for catching this yourself.
1460
1461  If `allow_smaller_final_batch` is `True`, a smaller batch value than
1462  `batch_size` is returned when the queue is closed and there are not enough
1463  elements to fill the batch, otherwise the pending elements are discarded.
1464  In addition, all output tensors' static shapes, as accessed via the
1465  `shape` property will have a first `Dimension` value of `None`, and
1466  operations that depend on fixed batch_size would fail.
1467
1468  Args:
1469    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1470    batch_size: An integer. The new batch size pulled from the queue.
1471    capacity: An integer. The maximum number of elements in the queue.
1472    min_after_dequeue: Minimum number elements in the queue after a
1473      dequeue, used to ensure a level of mixing of elements.
1474    seed: Seed for the random shuffling within the queue.
1475    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1476      example.
1477    shapes: (Optional) The shapes for each example.  Defaults to the
1478      inferred shapes for `tensors_list[i]`.
1479    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1480      batch to be smaller if there are insufficient items left in the queue.
1481    shared_name: (optional). If set, this queue will be shared under the given
1482      name across multiple sessions.
1483    name: (Optional) A name for the operations.
1484
1485  Returns:
1486    A list or dictionary of tensors with the same number and types as
1487    `tensors_list[i]`.
1488
1489  Raises:
1490    ValueError: If the `shapes` are not specified, and cannot be
1491      inferred from the elements of `tensors_list`.
1492
1493  @compatibility(eager)
1494  Input pipelines based on Queues are not supported when eager execution is
1495  enabled. Please use the `tf.data` API to ingest data under eager execution.
1496  @end_compatibility
1497  """
1498  return _shuffle_batch_join(
1499      tensors_list,
1500      batch_size,
1501      capacity,
1502      min_after_dequeue,
1503      keep_input=True,
1504      seed=seed,
1505      enqueue_many=enqueue_many,
1506      shapes=shapes,
1507      allow_smaller_final_batch=allow_smaller_final_batch,
1508      shared_name=shared_name,
1509      name=name)
1510
1511
1512@tf_export(v1=["train.maybe_shuffle_batch_join"])
1513@deprecation.deprecated(
1514    None, "Queue-based input pipelines have been replaced by `tf.data`. Use "
1515    "`tf.data.Dataset.interleave(...).filter(...).shuffle(min_after_dequeue)"
1516    ".batch(batch_size)`.")
1517def maybe_shuffle_batch_join(tensors_list, batch_size, capacity,
1518                             min_after_dequeue, keep_input, seed=None,
1519                             enqueue_many=False, shapes=None,
1520                             allow_smaller_final_batch=False, shared_name=None,
1521                             name=None):
1522  """Create batches by randomly shuffling conditionally-enqueued tensors.
1523
1524  See docstring in `shuffle_batch_join` for more details.
1525
1526  Args:
1527    tensors_list: A list of tuples or dictionaries of tensors to enqueue.
1528    batch_size: An integer. The new batch size pulled from the queue.
1529    capacity: An integer. The maximum number of elements in the queue.
1530    min_after_dequeue: Minimum number elements in the queue after a
1531      dequeue, used to ensure a level of mixing of elements.
1532    keep_input: A `bool` Tensor.  This tensor controls whether the input is
1533      added to the queue or not.  If it is a scalar and evaluates `True`, then
1534      `tensors` are all added to the queue. If it is a vector and `enqueue_many`
1535      is `True`, then each example is added to the queue only if the
1536      corresponding value in `keep_input` is `True`. This tensor essentially
1537      acts as a filtering mechanism.
1538    seed: Seed for the random shuffling within the queue.
1539    enqueue_many: Whether each tensor in `tensor_list_list` is a single
1540      example.
1541    shapes: (Optional) The shapes for each example.  Defaults to the
1542      inferred shapes for `tensors_list[i]`.
1543    allow_smaller_final_batch: (Optional) Boolean. If `True`, allow the final
1544      batch to be smaller if there are insufficient items left in the queue.
1545    shared_name: (optional). If set, this queue will be shared under the given
1546      name across multiple sessions.
1547    name: (Optional) A name for the operations.
1548
1549  Returns:
1550    A list or dictionary of tensors with the same number and types as
1551    `tensors_list[i]`.
1552
1553  Raises:
1554    ValueError: If the `shapes` are not specified, and cannot be
1555      inferred from the elements of `tensors_list`.
1556
1557  @compatibility(eager)
1558  Input pipelines based on Queues are not supported when eager execution is
1559  enabled. Please use the `tf.data` API to ingest data under eager execution.
1560  @end_compatibility
1561  """
1562  return _shuffle_batch_join(
1563      tensors_list,
1564      batch_size,
1565      capacity,
1566      min_after_dequeue,
1567      keep_input,
1568      seed=seed,
1569      enqueue_many=enqueue_many,
1570      shapes=shapes,
1571      allow_smaller_final_batch=allow_smaller_final_batch,
1572      shared_name=shared_name,
1573      name=name)
1574